From ae73e7c2ed86c34893359be0db205d46a5843957 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Tue, 11 Feb 2014 11:17:14 +0100 Subject: [PATCH] codec implementation with lots of tests commited --- pvAccessApp/Makefile | 2 + pvAccessApp/remote/blockingTCPAcceptor.cpp | 16 +- pvAccessApp/remote/codec.cpp | 1568 ++++++++++ pvAccessApp/remote/codec.h | 807 +++++ testApp/remote/Makefile | 6 + testApp/remote/channelAccessIFTest.cpp | 12 +- testApp/remote/testChannelAccess.cpp | 1 + testApp/remote/testCodec.cpp | 3195 ++++++++++++++++++++ 8 files changed, 5599 insertions(+), 8 deletions(-) create mode 100644 pvAccessApp/remote/codec.cpp create mode 100644 pvAccessApp/remote/codec.h create mode 100644 testApp/remote/testCodec.cpp diff --git a/pvAccessApp/Makefile b/pvAccessApp/Makefile index 41fbdc5..fc2a966 100644 --- a/pvAccessApp/Makefile +++ b/pvAccessApp/Makefile @@ -45,6 +45,7 @@ INC += channelSearchManager.h INC += simpleChannelSearchManagerImpl.h INC += transportRegistry.h INC += serializationHelper.h +INC += codec.h LIBSRCS += blockingUDPTransport.cpp LIBSRCS += blockingUDPConnector.cpp LIBSRCS += beaconHandler.cpp @@ -57,6 +58,7 @@ LIBSRCS += abstractResponseHandler.cpp LIBSRCS += blockingTCPAcceptor.cpp LIBSRCS += transportRegistry.cpp LIBSRCS += serializationHelper.cpp +LIBSRCS += codec.cpp SRC_DIRS += $(PVACCESS)/remoteClient INC += clientContextImpl.h diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index becc4f1..caf7a2a 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -5,6 +5,7 @@ */ #include +#include "codec.h" #include #include @@ -180,17 +181,28 @@ namespace pvAccess { } // TODO tune buffer sizes?! + + // get TCP send buffer size + osiSocklen_t intLen = sizeof(int); + int _socketSendBufferSize; + retval = getsockopt(newClient, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen); + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); + LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); + } + /** * Create transport, it registers itself to the registry. * Each transport should have its own response handler since it is not "shareable" */ std::auto_ptr responseHandler = _responseHandlerFactory->createResponseHandler(); - BlockingServerTCPTransport::shared_pointer transport = - BlockingServerTCPTransport::create( + BlockingServerTCPTransportCodec::shared_pointer transport = + BlockingServerTCPTransportCodec::create( _context, newClient, responseHandler, + _socketSendBufferSize, _receiveBufferSize); // validate connection diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp new file mode 100644 index 0000000..e85818d --- /dev/null +++ b/pvAccessApp/remote/codec.cpp @@ -0,0 +1,1568 @@ +/** +* Copyright - See the COPYRIGHT that is included with this distribution. +* pvAccessCPP is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +*/ +#ifdef _WIN32 +#define NOMINMAX +#endif + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + + +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + + +namespace epics { + namespace pvAccess { + + const std::size_t AbstractCodec::MAX_MESSAGE_PROCESS = 100; + const std::size_t AbstractCodec::MAX_MESSAGE_SEND = 100; + const std::size_t AbstractCodec::MAX_ENSURE_SIZE = 1024; + const std::size_t AbstractCodec::MAX_ENSURE_DATA_SIZE = MAX_ENSURE_SIZE/2; + const std::size_t AbstractCodec::MAX_ENSURE_BUFFER_SIZE = MAX_ENSURE_SIZE; + const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; + + AbstractCodec::AbstractCodec( + ByteBuffer *receiveBuffer, + ByteBuffer *sendBuffer, + int32_t socketSendBufferSize, + bool blockingProcessQueue): + //PROTECTED + _readMode(NORMAL), _version(0), _flags(0), _command(0), _payloadSize(0), + _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _totalBytesSent(0), + _blockingProcessQueue(false), _senderThread(0), + _writeMode(PROCESS_SEND_QUEUE), + _writeOpReady(false),_lowLatency(false), + //PRIVATE + _storedPayloadSize(0), _storedPosition(0), _startPosition(0), + _maxSendPayloadSize(0), + _lastMessageStartPosition(0),_lastSegmentedMessageType(0), + _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), + _byteOrderFlag(0x80),_socketSendBufferSize(0) + { + if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE) + throw std::invalid_argument( + "receiveBuffer.capacity() < 2*MAX_ENSURE_SIZE"); + + // require aligned buffer size + //(not condition, but simplifies alignment code) + + if (receiveBuffer->getSize() % PVA_ALIGNMENT != 0) + throw std::invalid_argument( + "receiveBuffer.capacity() % PVAConstants.PVA_ALIGNMENT != 0"); + + if (sendBuffer->getSize() < 2*MAX_ENSURE_SIZE) + throw std::invalid_argument("sendBuffer() < 2*MAX_ENSURE_SIZE"); + + // require aligned buffer size + //(not condition, but simplifies alignment code) + if (sendBuffer->getSize() % PVA_ALIGNMENT != 0) + throw std::invalid_argument( + "sendBuffer() % PVAConstants.PVA_ALIGNMENT != 0"); + + _socketBuffer.reset(receiveBuffer); + _sendBuffer.reset(sendBuffer); + + // initialize to be empty + _socketBuffer->setPosition(_socketBuffer->getLimit()); + _startPosition = _socketBuffer->getPosition(); + + // clear send + _sendBuffer->clear(); + + // start msg + control + _maxSendPayloadSize = + _sendBuffer->getSize() - 2*PVA_MESSAGE_HEADER_SIZE; + _socketSendBufferSize = socketSendBufferSize; + _blockingProcessQueue = blockingProcessQueue; + LOG(logLevelTrace, "AbstractCodec constructed (threadId: %u)", + epicsThreadGetIdSelf()); + } + + + void AbstractCodec::processRead() { + + LOG(logLevelTrace, "AbstractCodec::processRead: enter (threadId: %u)", + epicsThreadGetIdSelf()); + + switch (_readMode) + { + case NORMAL: + processReadNormal(); + break; + case SEGMENTED: + processReadSegmented(); + break; + case SPLIT: + throw std::logic_error("SPLIT NOT SUPPORTED"); + } + + } + + + void AbstractCodec::processHeader() { + + LOG(logLevelTrace, "AbstractCodec::processHeader enter (threadId: %u)", + epicsThreadGetIdSelf()); + + + // magic code + int8_t magicCode = _socketBuffer->getByte(); + + // version + _version = _socketBuffer->getByte(); + + // flags + _flags = _socketBuffer->getByte(); + + // command + _command = _socketBuffer->getByte(); + + // read payload size + _payloadSize = _socketBuffer->getInt(); + + // check magic code + if (magicCode != PVA_MAGIC) + { + LOG(logLevelError, + "Invalid header received from the client at %s:%d: %d," + " disconnecting...", + __FILE__, __LINE__, getLastReadBufferSocketAddress()); + invalidDataStreamHandler(); + throw invalid_data_stream_exception("invalid header received"); + } + + } + + + void AbstractCodec::processReadNormal() { + + LOG(logLevelTrace, + "AbstractCodec::processReadNormal enter (threadId: %u)", + epicsThreadGetIdSelf()); + + try + { + std::size_t messageProcessCount = 0; + while (messageProcessCount++ < MAX_MESSAGE_PROCESS) + { + // read as much as available, but at least for a header + // readFromSocket checks if reading from socket is really necessary + if (!readToBuffer(PVA_MESSAGE_HEADER_SIZE, false)) { + return; + } + + // read header fields + processHeader(); + bool isControl = ((_flags & 0x01) == 0x01); + if (isControl) { + processControlMessage(); + } + else + { + // segmented sanity check + bool notFirstSegment = (_flags & 0x20) != 0; + if (notFirstSegment) + { + LOG(logLevelWarn, + "Not-a-frst segmented message received in normal mode" + " from the client at %s:%d: %d, disconnecting...", + __FILE__, __LINE__, getLastReadBufferSocketAddress()); + invalidDataStreamHandler(); + throw invalid_data_stream_exception( + "not-a-first segmented message received in normal mode"); + } + + _storedPayloadSize = _payloadSize; + _storedPosition = _socketBuffer->getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit(std::min + (_storedPosition + _storedPayloadSize, _storedLimit)); + try + { + // handle response + processApplicationMessage(); + //TODO: MATEJ CHECK + throw simulate_finally_exception("go to finally block"); + } + catch(...) //finally + { + if (!isOpen()) + return; + + // can be closed by now + // isOpen() should be efficiently implemented + while (true) + //while (isOpen()) + { + // set position as whole message was read + //(in case code haven't done so) + std::size_t newPosition = + alignedValue( + _storedPosition + _storedPayloadSize, PVA_ALIGNMENT); + + // aligned buffer size ensures that there is enough space + //in buffer, + // however data might not be fully read + + // discard the rest of the packet + if (newPosition > _storedLimit) + { + // processApplicationMessage() did not read up + //quite some buffer + + // we only handle unused alignment bytes + int bytesNotRead = + newPosition - _socketBuffer->getPosition(); + + if (bytesNotRead < PVA_ALIGNMENT) + { + // make alignment bytes as real payload to enable SPLIT + // no end-of-socket or segmented scenario can happen + // due to aligned buffer size + _storedPayloadSize += bytesNotRead; + // reveal currently existing padding + _socketBuffer->setLimit(_storedLimit); + ensureData(bytesNotRead); + _storedPayloadSize -= bytesNotRead; + continue; + } + + // TODO we do not handle this for now (maybe never) + LOG(logLevelWarn, + "unprocessed read buffer from client at %s:%d: %d," + " disconnecting...", + __FILE__, __LINE__, getLastReadBufferSocketAddress()); + invalidDataStreamHandler(); + throw invalid_data_stream_exception( + "unprocessed read buffer"); + } + _socketBuffer->setLimit(_storedLimit); + _socketBuffer->setPosition(newPosition); + break; + } + } + } + } + + } + catch (invalid_data_stream_exception & ) + { + // noop, should be already handled (and logged) + } + catch (connection_closed_exception & ) + { + // noop, should be already handled (and logged) + } + } + + + void AbstractCodec::processReadSegmented() { + + LOG(logLevelTrace, + "AbstractCodec::processReadSegmented enter (threadId: %u)", + epicsThreadGetIdSelf()); + + while (true) + { + // read as much as available, but at least for a header + // readFromSocket checks if reading from socket is really necessary + readToBuffer(PVA_MESSAGE_HEADER_SIZE, true); + + // read header fields + processHeader(); + + bool isControl = ((_flags & 0x01) == 0x01); + if (isControl) + processControlMessage(); + else + { + // last segment bit set (means in-between segment or last segment) + // we expect this, no non-control messages between + //segmented message are supported + // NOTE: for now... it is easy to support non-semgented + //messages between segmented messages + bool notFirstSegment = (_flags & 0x20) != 0; + if (!notFirstSegment) + { + LOG(logLevelWarn, + "Not-a-first segmented message expected from the client at" + " %s:%d: %d, disconnecting...", + __FILE__, __LINE__, getLastReadBufferSocketAddress()); + invalidDataStreamHandler(); + throw new invalid_data_stream_exception( + "not-a-first segmented message expected"); + } + + _storedPayloadSize = _payloadSize; + + // return control to caller code + return; + } + } + + } + + + bool AbstractCodec::readToBuffer( + std::size_t requiredBytes, + bool persistent) { + + LOG(logLevelTrace, + "AbstractCodec::readToBuffer enter requiredBytes: %u," + " persistant: %d (threadId: %u)", + requiredBytes, persistent, epicsThreadGetIdSelf()); + + // do we already have requiredBytes available? + std::size_t remainingBytes = _socketBuffer->getRemaining(); + if (remainingBytes >= requiredBytes) { + return true; + } + + // assumption: remainingBytes < MAX_ENSURE_DATA_BUFFER_SIZE && + // requiredBytes < (socketBuffer.capacity() - PVA_ALIGNMENT) + + // + // copy unread part to the beginning of the buffer + // to make room for new data (as much as we can read) + // NOTE: requiredBytes is expected to be small (order of 10 bytes) + // + + // a new start position, we are careful to preserve alignment + _startPosition = + MAX_ENSURE_SIZE + _socketBuffer->getPosition() % PVA_ALIGNMENT; + + std::size_t endPosition = _startPosition + remainingBytes; + + for (std::size_t i = _startPosition; i < endPosition; i++) + _socketBuffer->putByte(i, _socketBuffer->getByte()); + + // update buffer to the new position + _socketBuffer->setLimit(_socketBuffer->getSize()); + _socketBuffer->setPosition(endPosition); + + // read at least requiredBytes bytes + std::size_t requiredPosition = _startPosition + requiredBytes; + while (_socketBuffer->getPosition() < requiredPosition) + { + int bytesRead = read(_socketBuffer.get()); + + LOG(logLevelTrace, + "AbstractCodec::readToBuffer READ BYTES: %d (threadId: %u)", + bytesRead, epicsThreadGetIdSelf()); + + if (bytesRead < 0) + { + close(); + throw connection_closed_exception("bytesRead < 0"); + } + // non-blocking IO support + else if (bytesRead == 0) + { + if (persistent) + readPollOne(); + else + { + // set pointers (aka flip) + _socketBuffer->setLimit(_socketBuffer->getPosition()); + _socketBuffer->setPosition(_startPosition); + + return false; + } + } + } + + // set pointers (aka flip) + _socketBuffer->setLimit(_socketBuffer->getPosition()); + _socketBuffer->setPosition(_startPosition); + + return true; + } + + + void AbstractCodec::ensureData(std::size_t size) { + + LOG(logLevelTrace, + "AbstractCodec::ensureData enter: size: %u (threadId: %u)", + size, epicsThreadGetIdSelf()); + + + // enough of data? + if (_socketBuffer->getRemaining() >= size) + return; + + // to large for buffer... + if (size > MAX_ENSURE_DATA_SIZE) {// half for SPLIT, half for SEGMENTED + std::ostringstream msg; + msg << "requested for buffer size " << size + << ", but maximum " << MAX_ENSURE_DATA_SIZE << " is allowed."; + LOG(logLevelWarn, + "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); + std::string s = msg.str(); + throw std::invalid_argument(s); + } + + try + { + + // subtract what was already processed + std::size_t pos = _socketBuffer->getPosition(); + _storedPayloadSize -= pos - _storedPosition; + + // SPLIT message case + // no more data and we have some payload left => read buffer + // NOTE: (storedPayloadSize >= size) does not work if size + //spans over multiple messages + if (_storedPayloadSize >= (_storedLimit-pos)) + { + // just read up remaining payload + // this will move current (getPosition(); + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit( + std::min( + _storedPosition + _storedPayloadSize, _storedLimit)); + + // check needed, if not enough data is available or + // we run into segmented message + ensureData(size); + } + // SEGMENTED message case + else + { + // TODO check flags + //if (flags && SEGMENTED_FLAGS_MASK == 0) + // throw IllegalStateException("segmented message expected, + //but current message flag does not indicate it"); + + + // copy remaining bytes of payload to safe area + //[0 to MAX_ENSURE_DATA_BUFFER_SIZE/2), if any + // remaining is relative to payload since buffer is + //bounded from outside + std::size_t remainingBytes = _socketBuffer->getRemaining(); + for (std::size_t i = 0; i < remainingBytes; i++) + _socketBuffer->putByte(i, _socketBuffer->getByte()); + + // restore limit (there might be some data already present + //and readToBuffer needs to know real limit) + _socketBuffer->setLimit(_storedLimit); + + // remember alignment offset of end of the message (to be restored) + std::size_t storedAlignmentOffset = + _socketBuffer->getPosition() % PVA_ALIGNMENT; + + // skip post-message alignment bytes + if (storedAlignmentOffset > 0) + { + std::size_t toSkip = PVA_ALIGNMENT - storedAlignmentOffset; + readToBuffer(toSkip, true); + std::size_t currentPos = _socketBuffer->getPosition(); + _socketBuffer->setPosition(currentPos + toSkip); + } + + // we expect segmented message, we expect header + // that (and maybe some control packets) needs to be "removed" + // so that we get combined payload + ReadMode storedMode = _readMode; _readMode = SEGMENTED; + processRead(); + _readMode = storedMode; + + // make sure we have all the data (maybe we run into SPLIT) + readToBuffer(size - remainingBytes + storedAlignmentOffset, true); + + // skip storedAlignmentOffset bytes (sender should padded start of + //segmented message) + // SPLIT cannot mess with this, since start of the message, + //i.e. current position, is always aligned + _socketBuffer->setPosition( + _socketBuffer->getPosition() + storedAlignmentOffset); + + // copy before position (i.e. start of the payload) + for (int32_t i = remainingBytes - 1, + j = _socketBuffer->getPosition() - 1; i >= 0; i--, j--) + _socketBuffer->putByte(j, _socketBuffer->getByte(i)); + + _startPosition = _socketBuffer->getPosition() - remainingBytes; + _socketBuffer->setPosition(_startPosition); + + _storedPayloadSize += remainingBytes - storedAlignmentOffset; + _storedPosition = _startPosition; + _storedLimit = _socketBuffer->getLimit(); + _socketBuffer->setLimit( + std::min( + _storedPosition + _storedPayloadSize, _storedLimit)); + + // sequential small segmented messages in the buffer + ensureData(size); + } + } + catch (io_exception &) { + try { + close(); + } catch (io_exception & ) { + // noop, best-effort close + } + throw connection_closed_exception( + "Failed to ensure data to read buffer."); + } + } + + + std::size_t AbstractCodec::alignedValue( + std::size_t value, + std::size_t alignment) { + + LOG(logLevelTrace, + "AbstractCodec::alignedValue enter: value: %u, alignment:%u" + " (threadId: %u)", + value, alignment, epicsThreadGetIdSelf()); + + std::size_t k = (alignment - 1); + return (value + k) & (~k); + } + + + void AbstractCodec::alignData(std::size_t alignment) { + + LOG(logLevelTrace, + "AbstractCodec::alignData enter: alignment:%u (threadId: %u)", + alignment, epicsThreadGetIdSelf()); + + std::size_t k = (alignment - 1); + std::size_t pos = _socketBuffer->getPosition(); + std::size_t newpos = (pos + k) & (~k); + if (pos == newpos) + return; + + std::size_t diff = _socketBuffer->getLimit() - newpos; + if (diff > 0) + { + _socketBuffer->setPosition(newpos); + return; + } + + ensureData(diff); + + // position has changed, recalculate + newpos = (_socketBuffer->getPosition() + k) & (~k); + _socketBuffer->setPosition(newpos); + } + + + void AbstractCodec::alignBuffer(std::size_t alignment) { + + LOG(logLevelTrace, "AbstractCodec::alignBuffer enter:" + " alignment:%u (threadId: %u)", + alignment, epicsThreadGetIdSelf()); + + std::size_t k = (alignment - 1); + std::size_t pos = _sendBuffer->getPosition(); + std::size_t newpos = (pos + k) & (~k); + if (pos == newpos) + return; + + // there is always enough of space + // since sendBuffer capacity % PVA_ALIGNMENT == 0 + _sendBuffer->setPosition(newpos); + } + + + void AbstractCodec::startMessage( + epics::pvData::int8 command, + std::size_t ensureCapacity) { + + LOG(logLevelTrace, + "AbstractCodec::startMessage enter: command:%x " + " ensureCapacity:%u (threadId: %u)", + command, ensureCapacity, epicsThreadGetIdSelf()); + + _lastMessageStartPosition = + std::numeric_limits::max(); // TODO revise this + ensureBuffer( + PVA_MESSAGE_HEADER_SIZE + ensureCapacity + _nextMessagePayloadOffset); + _lastMessageStartPosition = _sendBuffer->getPosition(); + _sendBuffer->putByte(PVA_MAGIC); + _sendBuffer->putByte(PVA_VERSION); + _sendBuffer->putByte( + (_lastSegmentedMessageType | _byteOrderFlag)); // data + endian + _sendBuffer->putByte(command); // command + _sendBuffer->putInt(0); // temporary zero payload + + // apply offset + if (_nextMessagePayloadOffset > 0) + _sendBuffer->setPosition( + _sendBuffer->getPosition() + _nextMessagePayloadOffset); + } + + + void AbstractCodec::putControlMessage( + epics::pvData::int8 command, + epics::pvData::int32 data) { + + LOG(logLevelTrace, + "AbstractCodec::putControlMessage enter: command:%x " + "data:%d (threadId: %u)", + command, data, epicsThreadGetIdSelf()); + + _lastMessageStartPosition = + std::numeric_limits::max(); // TODO revise this + ensureBuffer(PVA_MESSAGE_HEADER_SIZE); + _sendBuffer->putByte(PVA_MAGIC); + _sendBuffer->putByte(PVA_VERSION); + _sendBuffer->putByte((0x01 | _byteOrderFlag)); // control + endian + _sendBuffer->putByte(command); // command + _sendBuffer->putInt(data); // data + } + + + void AbstractCodec::endMessage() { + + LOG(logLevelTrace, "AbstractCodec::endMessage enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + endMessage(false); + } + + + void AbstractCodec::endMessage(bool hasMoreSegments) { + + LOG(logLevelTrace, + "AbstractCodec::endMessage enter: hasMoreSegments:%d (threadId: %u)", + hasMoreSegments, epicsThreadGetIdSelf()); + + if (_lastMessageStartPosition != std::numeric_limits::max()) + { + std::size_t lastPayloadBytePosition = _sendBuffer->getPosition(); + + // align + alignBuffer(PVA_ALIGNMENT); + + // set paylaod size (non-aligned) + std::size_t payloadSize = + lastPayloadBytePosition - + _lastMessageStartPosition - PVA_MESSAGE_HEADER_SIZE; + + _sendBuffer->putInt(_lastMessageStartPosition + 4, payloadSize); + + // set segmented bit + if (hasMoreSegments) { + // first segment + if (_lastSegmentedMessageType == 0) + { + std::size_t flagsPosition = _lastMessageStartPosition + 2; + epics::pvData::int8 type = _sendBuffer->getByte(flagsPosition); + // set first segment bit + _sendBuffer->putByte(flagsPosition, (type | 0x10)); + // first + last segment bit == in-between segment + _lastSegmentedMessageType = type | 0x30; + _lastSegmentedMessageCommand = + _sendBuffer->getByte(flagsPosition + 1); + } + _nextMessagePayloadOffset = lastPayloadBytePosition % PVA_ALIGNMENT; + } + else + { + // last segment + if (_lastSegmentedMessageType != + std::numeric_limits::max()) + { + std::size_t flagsPosition = _lastMessageStartPosition + 2; + // set last segment bit (by clearing first segment bit) + _sendBuffer->putByte(flagsPosition, + (_lastSegmentedMessageType & 0xEF)); + _lastSegmentedMessageType = 0; + } + _nextMessagePayloadOffset = 0; + } + + // TODO + /* + // manage markers + final int position = sendBuffer.position(); + final int bytesLeft = sendBuffer.remaining(); + if (position >= nextMarkerPosition && bytesLeft >= + PVAConstants.PVA_MESSAGE_HEADER_SIZE) + { + sendBuffer.put(PVAConstants.PVA_MAGIC); + sendBuffer.put(PVAConstants.PVA_VERSION); + sendBuffer.put((byte)(0x01 | byteOrderFlag)); // control data + sendBuffer.put((byte)0); // marker + sendBuffer.putInt((int)(totalBytesSent + position + + PVAConstants.PVA_MESSAGE_HEADER_SIZE)); + nextMarkerPosition = position + markerPeriodBytes; + } + */ + _lastMessageStartPosition = std::numeric_limits::max(); + } + } + + void AbstractCodec::ensureBuffer(std::size_t size) { + + LOG(logLevelTrace, + "AbstractCodec::ensureBuffer enter: size:%u (threadId: %u)", + size, epicsThreadGetIdSelf()); + + if (_sendBuffer->getRemaining() >= size) + return; + + // too large for buffer... + if (_maxSendPayloadSize < size) { + std::ostringstream msg; + msg << "requested for buffer size " << + size << ", but only " << _maxSendPayloadSize << " available."; + std::string s = msg.str(); + LOG(logLevelWarn, + "%s at %s:%d,", msg.str().c_str(), __FILE__, __LINE__); + throw std::invalid_argument(s); + } + + while (_sendBuffer->getRemaining() < size) + flush(false); + } + + + void AbstractCodec::flushSerializeBuffer() { + + LOG(logLevelTrace, + "AbstractCodec::flushSerializeBuffer enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + flush(false); + } + + + void AbstractCodec::flush(bool lastMessageCompleted) { + + LOG(logLevelTrace, + "AbstractCodec::flush enter: lastMessageCompleted:%d (threadId: %u)", + lastMessageCompleted, epicsThreadGetIdSelf()); + + // automatic end + endMessage(!lastMessageCompleted); + + _sendBuffer->flip(); + + try { + send(_sendBuffer.get()); + } catch (io_exception &) { + try { + if (isOpen()) + close(); + } catch (io_exception &) { + // noop, best-effort close + } + throw connection_closed_exception("Failed to send buffer."); + } + + _sendBuffer->clear(); + + _lastMessageStartPosition = std::numeric_limits::max(); + + // start with last header + if (!lastMessageCompleted && _lastSegmentedMessageType != 0) + startMessage(_lastSegmentedMessageCommand, 0); + } + + + void AbstractCodec::processWrite() { + + LOG(logLevelTrace, + "AbstractCodec::processWrite enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + // TODO catch ConnectionClosedException, InvalidStreamException? + switch (_writeMode) + { + case PROCESS_SEND_QUEUE: + processSendQueue(); + break; + case WAIT_FOR_READY_SIGNAL: + _writeOpReady = true; + break; + } + } + + + void AbstractCodec::send(ByteBuffer *buffer) + { + + LOG(logLevelTrace, "AbstractCodec::send enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + + // On Windows, limiting the buffer size is important to prevent + // poor throughput performances when transferring large amount of + // data. See Microsoft KB article KB823764. + // We do it also for other systems just to be safe. + std::size_t maxBytesToSend = + std::min( + _socketSendBufferSize, _remoteTransportSocketReceiveBufferSize) / 2; + + std::size_t limit = buffer->getLimit(); + std::size_t bytesToSend = limit - buffer->getPosition(); + + // limit sending + if (bytesToSend > maxBytesToSend) + { + bytesToSend = maxBytesToSend; + buffer->setLimit(buffer->getPosition() + bytesToSend); + } + + int tries = 0; + while (buffer->getRemaining() > 0) + { + + //int p = buffer.position(); + int bytesSent = write(buffer); + + if (IS_LOGGABLE(logLevelTrace)) { + hexDump(std::string("AbstractCodec::send WRITE"), + (const int8 *)buffer->getArray(), + buffer->getPosition(), buffer->getRemaining()); + } + + if (bytesSent < 0) + { + // connection lost + close(); + throw connection_closed_exception("bytesSent < 0"); + } + else if (bytesSent == 0) + { + sendBufferFull(tries++); + continue; + } + + _totalBytesSent += bytesSent; + + // readjust limit + if (bytesToSend == maxBytesToSend) + { + bytesToSend = limit - buffer->getPosition(); + + if(bytesToSend > maxBytesToSend) + bytesToSend = maxBytesToSend; + + buffer->setLimit(buffer->getPosition() + bytesToSend); + } + tries = 0; + } + } + + + void AbstractCodec::processSendQueue() + { + + LOG(logLevelTrace, + "AbstractCodec::processSendQueue enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + try + { + std::size_t senderProcessed = 0; + while (senderProcessed++ < MAX_MESSAGE_SEND) + { + TransportSender::shared_pointer sender = _sendQueue.take(-1); + if (sender.get() == 0) + { + // flush + if (_sendBuffer->getPosition() > 0) + flush(true); + + sendCompleted(); // do not schedule sending + + if (_blockingProcessQueue) { + if (terminated()) // termination + break; + sender = _sendQueue.take(0); + // termination (we want to process even if shutdown) + if (sender.get() == 0) + break; + } + else + return; + } + + processSender(sender); + } + } + //TODO MATEJ CHECK + //InterruptedException ie + catch (...) { + // noop, allowed and expected in blocking + } + + // flush + if (_sendBuffer->getPosition() > 0) + flush(true); + } + + + void AbstractCodec::clearSendQueue() + { + LOG(logLevelTrace, + "AbstractCodec::clearSendQueue enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + _sendQueue.clean(); + } + + + void AbstractCodec::enqueueSendRequest( + TransportSender::shared_pointer const & sender) { + + LOG(logLevelTrace, + "AbstractCodec::enqueueSendRequest enter: sender is set:%d" + " (threadId: %u)", + (sender.get() != 0), epicsThreadGetIdSelf()); + + _sendQueue.put(sender); + scheduleSend(); + } + + + void AbstractCodec::setSenderThread() + { + LOG(logLevelTrace, + "AbstractCodec::setSenderThread enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + _senderThread = epicsThreadGetIdSelf(); + } + + + void AbstractCodec::processSender( + TransportSender::shared_pointer const & sender) + { + + LOG(logLevelTrace, + "AbstractCodec::processSender enter: sender is set:%d (threadId: %u)", + (sender.get() != 0), epicsThreadGetIdSelf); + + ScopedLock lock(sender); + + try { + _lastMessageStartPosition = _sendBuffer->getPosition(); + + sender->send(_sendBuffer.get(), this); + + // automatic end (to set payload size) + endMessage(false); + } + catch (std::exception &e ) { + + std::ostringstream msg; + msg << "an exception caught while processing a send message: " + << e.what(); + LOG(logLevelWarn, "%s at %s:%d", + msg.str().c_str(), __FILE__, __LINE__); + + try { + close(); + } catch (io_exception & ) { + // noop + } + + throw connection_closed_exception(msg.str()); + } + } + + + void AbstractCodec::enqueueSendRequest( + TransportSender::shared_pointer const & sender, + std::size_t requiredBufferSize) { + + LOG(logLevelTrace, + "AbstractCodec::enqueueSendRequest enter: sender is set:%d " + "requiredBufferSize:%u (threadId: %u)", + (sender.get() != 0), requiredBufferSize, epicsThreadGetIdSelf); + + if (_senderThread == epicsThreadGetIdSelf() && + _sendQueue.empty() && + _sendBuffer->getRemaining() >= requiredBufferSize) + { + processSender(sender); + if (_sendBuffer->getPosition() > 0) + { + if (_lowLatency) + flush(true); + else + scheduleSend(); + } + } + else + enqueueSendRequest(sender); + } + + + void AbstractCodec::setRecipient(osiSockAddr const & sendTo) { + + LOG(logLevelTrace, + "AbstractCodec::setRecipient enter: (threadId: %u)", + epicsThreadGetIdSelf); + + _sendTo = sendTo; + } + + + void AbstractCodec::setByteOrder(int byteOrder) + { + + LOG(logLevelTrace, + "AbstractCodec::setByteOrder enter: byteOrder:%x (threadId: %u)", + byteOrder, epicsThreadGetIdSelf()); + + _socketBuffer->setEndianess(byteOrder); + // TODO sync + _sendBuffer->setEndianess(byteOrder); + _byteOrderFlag = EPICS_ENDIAN_BIG == byteOrder ? 0x80 : 0x00; + } + + + + // + // + // BlockingAbstractCodec + // + // + // + + void BlockingAbstractCodec::readPollOne() { + + LOG(logLevelTrace, + "BlockingAbstractCodec::readPollOne enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + throw std::logic_error("should not be called for blocking IO"); + } + + + void BlockingAbstractCodec::writePollOne() { + + LOG(logLevelTrace, + "BlockingAbstractCodec::writePollOne enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + throw std::logic_error("should not be called for blocking IO"); + } + + + void BlockingAbstractCodec::close() { + + LOG(logLevelTrace, + "BlockingAbstractCodec::close enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + + if (_isOpen.getAndSet(false)) + { + // always close in the same thread, same way, etc. + // wakeup processSendQueue + LOG(logLevelTrace, + "BlockingAbstractCodec::close _sendQueue.waaaaakeup: " + " (threadId: %u)", + epicsThreadGetIdSelf()); + + _sendQueue.wakeup(); + } + else { + LOG(logLevelTrace, + "BlockingAbstractCodec::close NOT WAKING UP _sendQueue: " + " (threadId: %u)", + epicsThreadGetIdSelf()); + } + } + + + bool BlockingAbstractCodec::terminated() { + + LOG(logLevelTrace, + "BlockingAbstractCodec::terminated enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + //TODO OPEN QUESTION TO MATEJ + return !isOpen(); + } + + + bool BlockingAbstractCodec::isOpen() { + + LOG(logLevelTrace, "BlockingAbstractCodec::isOpen enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + return _isOpen.get(); + } + + + void BlockingAbstractCodec::start() { + + LOG(logLevelTrace, "BlockingAbstractCodec::start enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + _readThread = epicsThreadCreate( + "BlockingAbstractCodec-readThread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize( + epicsThreadStackMedium), + BlockingAbstractCodec::receiveThread, + this); + + _sendThread = epicsThreadCreate( + "BlockingAbstractCodec-_sendThread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize( + epicsThreadStackMedium), + BlockingAbstractCodec::sendThread, + this); + + LOG(logLevelTrace, + "BlockingAbstractCodec::start exit WITH readThread: %u," + " sendThread:%u (threadId: %u)", + _readThread, _sendThread, epicsThreadGetIdSelf()); + + } + + + void BlockingAbstractCodec::receiveThread(void *param) + { + + LOG(logLevelTrace, + "BlockingAbstractCodec::receiveThread enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + + BlockingAbstractCodec *bac = static_cast(param); + + while (bac->isOpen()) + { + try { + bac->processRead(); + } catch (io_exception &e) { + LOG(logLevelWarn, + "an exception caught while in receiveThread at %s:%d: %s", + __FILE__, __LINE__, e.what()); + } + } + + LOG(logLevelTrace, "BlockingAbstractCodec::receiveThread" + " EXIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIT: (threadId: %u)", + epicsThreadGetIdSelf()); + + } + + + void BlockingAbstractCodec::sendThread(void *param) + { + + LOG(logLevelTrace, + "BlockingAbstractCodec::sendThread enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + BlockingAbstractCodec *bac = static_cast(param); + + bac->setSenderThread(); + + while (bac->isOpen()) + { + try { + bac->processWrite(); + } catch (io_exception &e) { + LOG(logLevelWarn, + "an exception caught while in sendThread at %s:%d: %s", + __FILE__, __LINE__, e.what()); + } + } + + LOG(logLevelTrace, + "BlockingAbstractCodec::sendThread EXIIIIIIIIIIIIIIT" + " while(bac->isOpen): (threadId: %u)", + epicsThreadGetIdSelf()); + + + // wait read thread to die + //TODO epics join thread + //readThread.join(); // TODO timeout + //bac->_shutdownEvent.signal(); + + // call internal destroy + LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX" + "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX (threadId: %u)", + epicsThreadGetIdSelf()); + //bac->internalDestroy(); + LOG(logLevelTrace, "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY" + "YYYYYYYYYYYYYYYYYYYYYYYYYYYY (threadId: %u)", + epicsThreadGetIdSelf()); + + LOG(logLevelTrace, + "BlockingAbstractCodec::sendThread EXIIIIT (threadId: %u)", + epicsThreadGetIdSelf()); + + } + + + void BlockingAbstractCodec::sendBufferFull(int tries) { + + LOG(logLevelTrace, + "BlockingAbstractCodec::sendBufferFull enter: tries: %d " + "(threadId: %u)", + tries, epicsThreadGetIdSelf()); + + // TODO constants + epicsThreadSleep(std::max(tries * 0.1, 1)); + } + + + // + // + // BlockingSocketAbstractCodec + // + // + // + + + BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( + SOCKET channel, + int32_t sendBufferSize, + int32_t receiveBufferSize): + BlockingAbstractCodec( + new ByteBuffer((std::max((std::size_t)( + MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1))), + new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) + & (~(PVA_ALIGNMENT - 1))), sendBufferSize), + _channel(channel) + { + + // get remote address + osiSocklen_t saSize = sizeof(sockaddr); + int retval = getpeername(_channel, &(_socketAddress.sa), &saSize); + if(unlikely(retval<0)) { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + LOG(logLevelError, + "Error fetching socket remote address: %s", + errStr); + } + + // set receive timeout so that we do not have problems at + //shutdown (recvfrom would block) + struct timeval timeout; + memset(&timeout, 0, sizeof(struct timeval)); + timeout.tv_sec = 1; + timeout.tv_usec = 0; + + if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, + (char*)&timeout, sizeof(timeout)) < 0)) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + LOG(logLevelError, + "Failed to set SO_RCVTIMEO for TDP socket %s: %s.", + inetAddressToString(_socketAddress).c_str(), errStr); + } + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec constructed (threadId: %u)", + epicsThreadGetIdSelf()); + } + + + void BlockingSocketAbstractCodec::internalDestroy() { + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::internalDestroy enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + if(_channel != INVALID_SOCKET) { + epicsSocketDestroy(_channel); + _channel = INVALID_SOCKET; + } + + } + + + void BlockingSocketAbstractCodec::invalidDataStreamHandler() { + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::invalidDataStreamHandler enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + close(); + } + + + int BlockingSocketAbstractCodec::write( + epics::pvData::ByteBuffer *src) { + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::write enter: position:%u, " + "remaining:%u (threadId: %u)", + src->getPosition(), src->getRemaining(), epicsThreadGetIdSelf()); + + std::size_t remaining; + while((remaining=src->getRemaining()) > 0) { + + int bytesSent = ::send(_channel, + &src->getArray()[src->getPosition()], + remaining, 0); + + if(unlikely(bytesSent<0)) { + + int socketError = SOCKERRNO; + + // spurious EINTR check + if (socketError==SOCK_EINTR) + continue; + } + + if (bytesSent > 0) { + src->setPosition(src->getPosition() + bytesSent); + } + + return bytesSent; + + } + + //TODO check what to return + return -1; + } + + + std::size_t BlockingSocketAbstractCodec::getSocketReceiveBufferSize() + const { + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" + " enter (threadId: %u)", epicsThreadGetIdSelf()); + + osiSocklen_t intLen = sizeof(int); + char strBuffer[64]; + int socketRecvBufferSize; + int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, + (char *)&socketRecvBufferSize, &intLen); + + if(retval<0) { + epicsSocketConvertErrnoToString(strBuffer, sizeof(strBuffer)); + //LOG(logLevelDebug, "Error getting SO_SNDBUF: %s", strBuffer); + } + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::getSocketReceiveBufferSize" + " returning:%u (threadId: %u)", socketRecvBufferSize, + epicsThreadGetIdSelf()); + + return socketRecvBufferSize; + } + + + int BlockingSocketAbstractCodec::read(epics::pvData::ByteBuffer* dst) { + + LOG(logLevelTrace, + "BlockingSocketAbstractCodec::read enter: " + "read bytes:%u (threadId: %u)", + dst->getRemaining(), epicsThreadGetIdSelf()); + + std::size_t remaining; + while((remaining=dst->getRemaining()) > 0) { + + // read + std::size_t pos = dst->getPosition(); + + int bytesRead = recv(_channel, + (char*)(dst->getArray()+pos), remaining, 0); + + if (IS_LOGGABLE(logLevelTrace)) { + hexDump(std::string("READ"), + (const int8 *)(dst->getArray()+pos), bytesRead); + } + + if(unlikely(bytesRead<=0)) { + + if (bytesRead<0) + { + int socketError = SOCKERRNO; + + // interrupted or timeout + if (socketError == EINTR || + socketError == EAGAIN || + socketError == EWOULDBLOCK) + continue; + } + + return -1; // 0 means connection loss for blocking transport, notify codec by returning -1 + } + dst->setPosition(dst->getPosition() + bytesRead); + return bytesRead; + } + + //TODO check what to return + return -1; + } + + + BlockingServerTCPTransportCodec::BlockingServerTCPTransportCodec( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize) : + BlockingTCPTransportCodec(context, channel, responseHandler, + sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY), + _lastChannelSID(0) + { + + // NOTE: priority not yet known, default priority is used to + //register/unregister + // TODO implement priorities in Reactor... not that user will + // change it.. still getPriority() must return "registered" priority! + + start(); + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec constructed (threadId: %u)", + epicsThreadGetIdSelf()); + + } + + + BlockingServerTCPTransportCodec::~BlockingServerTCPTransportCodec() { + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec DESTRUCTED (threadId: %u)", + epicsThreadGetIdSelf()); + } + + + pvAccessID BlockingServerTCPTransportCodec::preallocateChannelSID() { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::preallocateChannelSID enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + Lock lock(_channelsMutex); + // search first free (theoretically possible loop of death) + pvAccessID sid = ++_lastChannelSID; + while(_channels.find(sid)!=_channels.end()) + sid = ++_lastChannelSID; + return sid; + } + + + void BlockingServerTCPTransportCodec::registerChannel( + pvAccessID sid, + ServerChannel::shared_pointer const & channel) { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::registerChannel enter: sid:%d " + " (threadId: %u)", + channel->getSID(), epicsThreadGetIdSelf()); + + Lock lock(_channelsMutex); + _channels[sid] = channel; + + } + + + void BlockingServerTCPTransportCodec::unregisterChannel(pvAccessID sid) { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::unregisterChannel enter:" + " sid:%d (threadId: %u)", + sid, epicsThreadGetIdSelf()); + + Lock lock(_channelsMutex); + _channels.erase(sid); + } + + + ServerChannel::shared_pointer + BlockingServerTCPTransportCodec::getChannel(pvAccessID sid) { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::getChannel enter:" + " sid:%d (threadId: %u)", + sid, epicsThreadGetIdSelf()); + + Lock lock(_channelsMutex); + + std::map::iterator it = + _channels.find(sid); + + if(it!=_channels.end()) return it->second; + + return ServerChannel::shared_pointer(); + } + + + int BlockingServerTCPTransportCodec::getChannelCount() { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::getChannelCount enter: " + "(threadId: %u)", + epicsThreadGetIdSelf()); + + Lock lock(_channelsMutex); + return static_cast(_channels.size()); + } + + + void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer, + TransportSendControl* control) { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::send enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + // + // set byte order control message + // + + ensureBuffer(PVA_MESSAGE_HEADER_SIZE); + buffer->putByte(PVA_MAGIC); + buffer->putByte(PVA_VERSION); + buffer->putByte( + 0x01 | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) + ? 0x80 : 0x00)); // control + big endian + buffer->putByte(2); // set byte order + buffer->putInt(0); + + + // + // send verification message + // + control->startMessage(CMD_CONNECTION_VALIDATION, 2*sizeof(int32)); + + // receive buffer size + buffer->putInt(static_cast(getReceiveBufferSize())); + + // socket receive buffer size + buffer->putInt(static_cast(getSocketReceiveBufferSize())); + + // send immediately + control->flush(true); + } + } +} diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h new file mode 100644 index 0000000..f885d88 --- /dev/null +++ b/pvAccessApp/remote/codec.h @@ -0,0 +1,807 @@ +/** +* Copyright - See the COPYRIGHT that is included with this distribution. +* pvAccessCPP is distributed subject to a Software License Agreement found +* in file LICENSE that is included with this distribution. +*/ + +#ifndef CODEC_H_ +#define CODEC_H_ + +#include +#include +#include + +#ifdef epicsExportSharedSymbols +# define abstractCodecEpicsExportSharedSymbols +# undef epicsExportSharedSymbols +#endif + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#ifdef abstractCodecEpicsExportSharedSymbols +# define epicsExportSharedSymbols +# undef abstractCodecEpicsExportSharedSymbols +#endif + +#include +#include +#include +#include +#include +#include + +namespace epics { + namespace pvAccess { + + + template + class AtomicValue + { + public: + AtomicValue(): _value(0) {}; + + T getAndSet(T value) + { + mutex.lock(); + T tmp = _value; _value = value; + mutex.unlock(); + return tmp; + } + + T get() { mutex.lock(); T tmp = _value; mutex.unlock(); return tmp; } + + private: + T _value; + epics::pvData::Mutex mutex; + }; + + + template + class queue { + public: + + queue(void) { } + //TODO + /*queue(queue const &T) = delete; + queue(queue &&T) = delete; + queue& operator=(const queue &T) = delete; + */ + ~queue(void) + { + LOG(logLevelTrace, + "queue::~queue DESTROY (threadId: %u)", epicsThreadGetIdSelf()); + } + + + bool empty(void) + { + LOG(logLevelTrace, + "queue::empty enter: (threadId: %u)", epicsThreadGetIdSelf()); + epics::pvData::Lock lock(_queueMutex); + return _queue.empty(); + } + + void clean() + { + LOG(logLevelTrace, "queue::clean enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + epics::pvData::Lock lock(_queueMutex); + _queue.clear(); + } + + + void wakeup() + { + + LOG(logLevelTrace, "queue::wakeup enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + if (!_wakeup.getAndSet(true)) + { + LOG(logLevelTrace, + "queue::wakeup signaling on _queueEvent: (threadId: %u)", + epicsThreadGetIdSelf()); + _queueEvent.signal(); + } + } + + + void put(T const & elem) + { + LOG(logLevelTrace, + "queue::put enter (threadId: %u)", epicsThreadGetIdSelf()); + + { + epics::pvData::Lock lock(_queueMutex); + _queue.push_front(elem); + } + + _queueEvent.signal(); + } + + + T take(int timeOut) + { + + LOG(logLevelTrace, + "queue::take enter timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + while (true) + { + + bool isEmpty = empty(); + + if (isEmpty) + { + + if (timeOut < 0) { + epics::pvAccess::LOG(logLevelTrace, + "queue::take exit timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + return T(); + } + + while (isEmpty) + { + + if (timeOut == 0) { + + LOG(logLevelTrace, + "queue::take going to wait timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + _queueEvent.wait(); + } + else { + + LOG(logLevelTrace, + "queue::take going to wait timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + _queueEvent.wait(timeOut); + } + + LOG(logLevelTrace, + "queue::take waking up timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + isEmpty = empty(); + if (isEmpty) + { + if (timeOut > 0) { // TODO spurious wakeup, but not critical + LOG(logLevelTrace, + "queue::take exit after being woken up timeOut:%d" + " (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + return T(); + } + else // if (timeout == 0) cannot be negative + { + if (_wakeup.getAndSet(false)) { + + LOG(logLevelTrace, + "queue::take exit after being woken up timeOut:%d" + " (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + return T(); + } + } + } + } + } + else + { + + LOG(logLevelTrace, + "queue::take obtaining lock for front element timeOut:%d" + " (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + epics::pvData::Lock lock(_queueMutex); + T sender = _queue.front(); + _queue.pop_front(); + + LOG(logLevelTrace, + "queue::take exit with sender timeOut:%d (threadId: %u)", + timeOut, epicsThreadGetIdSelf()); + + return sender; + } + } + } + + private: + + std::deque _queue; + epics::pvData::Event _queueEvent; + epics::pvData::Mutex _queueMutex; + AtomicValue _wakeup; + epics::pvData::Mutex _stdMutex; + }; + + + class simulate_finally_exception: public std::runtime_error { + public: + explicit simulate_finally_exception( + const std::string &s): std::runtime_error(s) {} + }; + + class io_exception: public std::runtime_error { + public: + explicit io_exception(const std::string &s): std::runtime_error(s) {} + }; + + + class invalid_data_stream_exception: public std::runtime_error { + public: + explicit invalid_data_stream_exception( + const std::string &s): std::runtime_error(s) {} + }; + + + class connection_closed_exception: public std::runtime_error { + public: + explicit connection_closed_exception(const std::string &s): std::runtime_error(s) {} + }; + + + enum ReadMode { NORMAL, SPLIT, SEGMENTED }; + + enum WriteMode { PROCESS_SEND_QUEUE, WAIT_FOR_READY_SIGNAL }; + + + class AbstractCodec : + public TransportSendControl, + public Transport + { + public: + + static const std::size_t MAX_MESSAGE_PROCESS; + static const std::size_t MAX_MESSAGE_SEND; + static const std::size_t MAX_ENSURE_SIZE; + static const std::size_t MAX_ENSURE_DATA_SIZE; + static const std::size_t MAX_ENSURE_BUFFER_SIZE; + static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE; + + AbstractCodec( + epics::pvData::ByteBuffer *receiveBuffer, + epics::pvData::ByteBuffer *sendBuffer, + int32_t socketSendBufferSize, + bool blockingProcessQueue); + + virtual void processControlMessage() = 0; + virtual void processApplicationMessage() = 0; + virtual osiSockAddr getLastReadBufferSocketAddress() = 0; + virtual void invalidDataStreamHandler() = 0; + virtual void readPollOne()=0; + virtual void writePollOne() = 0; + virtual void scheduleSend() = 0; + virtual void sendCompleted() = 0; + virtual bool terminated() = 0; + virtual int write(epics::pvData::ByteBuffer* src) = 0; + virtual int read(epics::pvData::ByteBuffer* dst) = 0; + virtual bool isOpen() = 0; + virtual void close() = 0; + + + virtual ~AbstractCodec() + { + LOG(logLevelTrace, + "AbstractCodec::~AbstractCodec DESTROY (threadId: %u)", + epicsThreadGetIdSelf()); + } + + void alignBuffer(std::size_t alignment); + void ensureData(std::size_t size); + void alignData(std::size_t alignment); + void startMessage( + epics::pvData::int8 command, + std::size_t ensureCapacity); + void putControlMessage( + epics::pvData::int8 command, + epics::pvData::int32 data); + void endMessage(); + void ensureBuffer(std::size_t size); + void flushSerializeBuffer(); + void flush(bool lastMessageCompleted); + void processWrite(); + void processRead(); + void processSendQueue(); + void clearSendQueue(); + void enqueueSendRequest(TransportSender::shared_pointer const & sender); + void enqueueSendRequest(TransportSender::shared_pointer const & sender, + std::size_t requiredBufferSize); + void setSenderThread(); + void setRecipient(osiSockAddr const & sendTo); + void setByteOrder(int byteOrder); + + static std::size_t alignedValue(std::size_t value, std::size_t alignment); + + protected: + + virtual void sendBufferFull(int tries) = 0; + void send(epics::pvData::ByteBuffer *buffer); + + + ReadMode _readMode; + int8_t _version; + int8_t _flags; + int8_t _command; + int32_t _payloadSize; + epics::pvData::int32 _remoteTransportSocketReceiveBufferSize; + int64_t _totalBytesSent; + bool _blockingProcessQueue; + //TODO initialize union + osiSockAddr _sendTo; + epicsThreadId _senderThread; + WriteMode _writeMode; + bool _writeOpReady; + bool _lowLatency; + + std::auto_ptr _socketBuffer; + std::auto_ptr _sendBuffer; + + epics::pvAccess::queue _sendQueue; + + private: + + void processHeader(); + void processReadNormal(); + void processReadSegmented(); + bool readToBuffer(std::size_t requiredBytes, bool persistent); + void endMessage(bool hasMoreSegments); + void processSender( + epics::pvAccess::TransportSender::shared_pointer const & sender); + + std::size_t _storedPayloadSize; + std::size_t _storedPosition; + std::size_t _storedLimit; + std::size_t _startPosition; + + std::size_t _maxSendPayloadSize; + std::size_t _lastMessageStartPosition; + std::size_t _lastSegmentedMessageType; + int8_t _lastSegmentedMessageCommand; + std::size_t _nextMessagePayloadOffset; + + epics::pvData::int8 _byteOrderFlag; + int32_t _socketSendBufferSize; + }; + + + class BlockingAbstractCodec: public AbstractCodec { + + public: + + POINTER_DEFINITIONS(BlockingAbstractCodec); + + BlockingAbstractCodec( + epics::pvData::ByteBuffer *receiveBuffer, + epics::pvData::ByteBuffer *sendBuffer, + int32_t socketSendBufferSize): + AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true), + _readThread(0), _sendThread(0) { _isOpen.getAndSet(true);} + + void readPollOne(); + void writePollOne(); + void scheduleSend() {} + void sendCompleted() {} + void close(); + bool terminated(); + bool isOpen(); + void start(); + + static void receiveThread(void* param); + static void sendThread(void* param); + + protected: + void sendBufferFull(int tries); + virtual void internalDestroy() = 0; + + private: + AtomicValue _isOpen; + volatile epicsThreadId _readThread; + volatile epicsThreadId _sendThread; + epics::pvData::Event _shutdownEvent; + }; + + + class BlockingSocketAbstractCodec: public BlockingAbstractCodec { + + public: + + BlockingSocketAbstractCodec( + SOCKET channel, + int32_t sendBufferSize, + int32_t receiveBufferSize); + + int read(epics::pvData::ByteBuffer* dst); + int write(epics::pvData::ByteBuffer* src); + osiSockAddr getLastReadBufferSocketAddress() { return _socketAddress;} + void invalidDataStreamHandler(); + std::size_t getSocketReceiveBufferSize() const; + + protected: + + void internalDestroy(); + + private: + SOCKET _channel; + osiSockAddr _socketAddress; + }; + + + class BlockingTCPTransportCodec : + public BlockingSocketAbstractCodec, + public std::tr1::enable_shared_from_this { + + public: + + epics::pvData::String getType() const { + return epics::pvData::String("TCP"); + } + + + void internalDestroy() { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::internalDestroy() enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + BlockingSocketAbstractCodec::internalDestroy(); + Transport::shared_pointer thisSharedPtr = this->shared_from_this(); + _context->getTransportRegistry()->remove(thisSharedPtr); + } + + + void changedTransport() {} + + + void processControlMessage() { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::processControlMessage()" + "enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + if (_command == 2) + { + // check 7-th bit + setByteOrder(_flags < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE); + } + } + + + void processApplicationMessage() { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::processApplicationMessage() enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + _responseHandler->handleResponse(&_socketAddress, shared_from_this(), + _version, _command, _payloadSize, _socketBuffer.get()); + } + + + const osiSockAddr* getRemoteAddress() const { + return &_socketAddress; + } + + + epics::pvData::int8 getRevision() const { + return PVA_PROTOCOL_REVISION; + } + + + std::size_t getReceiveBufferSize() const { + return _socketBuffer->getSize(); + } + + + epics::pvData::int16 getPriority() const { + return _priority; + } + + + void setRemoteRevision(epics::pvData::int8 revision) { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::setRemoteRevision() enter:" + " revision: %d (threadId: %u)", + revision, epicsThreadGetIdSelf()); + + _remoteTransportRevision = revision; + } + + + void setRemoteTransportReceiveBufferSize( + std::size_t remoteTransportReceiveBufferSize) { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::setRemoteTransportReceiveBufferSize()" + " enter: remoteTransportReceiveBufferSize:%u (threadId: %u)", + remoteTransportReceiveBufferSize, epicsThreadGetIdSelf()); + + _remoteTransportReceiveBufferSize = remoteTransportReceiveBufferSize; + } + + + void setRemoteTransportSocketReceiveBufferSize( + std::size_t socketReceiveBufferSize) { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::" + "setRemoteTransportSocketReceiveBufferSize()" + "enter: socketReceiveBufferSize:%u (threadId: %u)", + socketReceiveBufferSize, epicsThreadGetIdSelf()); + + _remoteTransportSocketReceiveBufferSize = socketReceiveBufferSize; + } + + + std::tr1::shared_ptr + cachedDeserialize(epics::pvData::ByteBuffer* buffer) + { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::cachedDeserialize() enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + return _incomingIR.deserialize(buffer, this); + } + + + void cachedSerialize( + const std::tr1::shared_ptr& field, + epics::pvData::ByteBuffer* buffer) + { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::cachedSerialize() enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + _outgoingIR.serialize(field, buffer, this); + } + + + bool directSerialize( + epics::pvData::ByteBuffer *existingBuffer, + const char* toSerialize, + std::size_t elementCount, std::size_t elementSize) + { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::directSerialize() enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + return false; + } + + + bool directDeserialize(epics::pvData::ByteBuffer *existingBuffer, + char* deserializeTo, + std::size_t elementCount, std::size_t elementSize) { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::directDeserialize() enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + return false; + } + + + void flushSendQueue() { }; + + + bool isClosed() { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::isClosed() enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + return !isOpen(); + } + + + void activate() { + + LOG(logLevelTrace, + "BlockingTCPTransportCodec::activate() enter: (threadId: %u)", + epicsThreadGetIdSelf()); + + Transport::shared_pointer thisSharedPtr = shared_from_this(); + _context->getTransportRegistry()->put(thisSharedPtr); + } + + protected: + + BlockingTCPTransportCodec( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize, + epics::pvData::int16 priority + ): + BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize), + _context(context), _responseHandler(responseHandler), + _verified(false), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), + _remoteTransportRevision(0), _priority(priority) + { + LOG(logLevelTrace, + "BlockingTCPTransportCodec constructed: (threadId: %u)", + epicsThreadGetIdSelf()); + } + + + private: + + Context::shared_pointer _context; + std::auto_ptr _responseHandler; + bool _verified; + size_t _remoteTransportReceiveBufferSize; + epics::pvData::int8 _remoteTransportRevision; + epics::pvData::int16 _priority; + + osiSockAddr _socketAddress; + epics::pvData::Mutex _verifiedMutex; + epics::pvData::Event _verifiedEvent; + IntrospectionRegistry _incomingIR; + IntrospectionRegistry _outgoingIR; + + }; + + + class BlockingServerTCPTransportCodec : + public BlockingTCPTransportCodec, + public ChannelHostingTransport, + public TransportSender { + + public: + POINTER_DEFINITIONS(BlockingServerTCPTransportCodec); + + BlockingServerTCPTransportCodec( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int32_t sendBufferSize, + int32_t receiveBufferSize ); + + static shared_pointer create( + Context::shared_pointer const & context, + SOCKET channel, + std::auto_ptr& responseHandler, + int sendBufferSize, + int receiveBufferSize) + { + shared_pointer thisPointer( + new BlockingServerTCPTransportCodec( + context, channel, responseHandler, + sendBufferSize, receiveBufferSize) + ); + thisPointer->activate(); + return thisPointer; + } + + public: + + bool acquire(std::tr1::shared_ptr const & client) + { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::acquire() enter:" + " client is set: %d (threadId: %u)", + (client.get() != 0), epicsThreadGetIdSelf()); + + return false; + } + + void release(pvAccessID /*clientId*/) {} + + pvAccessID preallocateChannelSID(); + + void depreallocateChannelSID(pvAccessID /*sid*/) { + // noop + } + + void registerChannel( + pvAccessID sid, + ServerChannel::shared_pointer const & channel); + + void unregisterChannel(pvAccessID sid); + + ServerChannel::shared_pointer getChannel(pvAccessID sid); + + int getChannelCount(); + + epics::pvData::PVField::shared_pointer getSecurityToken() { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::getSecurityToken() enter:" + " (threadId: %u)", + epicsThreadGetIdSelf()); + + return epics::pvData::PVField::shared_pointer(); + } + + void lock() { + // noop + } + + void unlock() { + // noop + } + + void acquire() { + // noop, since does not make sence on itself + } + + void release() { + // noop, since does not make sence on itself + } + + bool verify(epics::pvData::int32 timeoutMs) { + + LOG(logLevelTrace, + "BlockingServerTCPTransportCodec::verify() enter: " + "timeoutMs:%d (threadId: %u)", + timeoutMs, epicsThreadGetIdSelf()); + + TransportSender::shared_pointer transportSender = + std::tr1::dynamic_pointer_cast(shared_from_this()); + enqueueSendRequest(transportSender); + verified(); + return true; + } + + void verified() { + } + + void aliveNotification() { + // noop on server-side + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control); + + virtual ~BlockingServerTCPTransportCodec(); + + private: + + /** + * Last SID cache. + */ + pvAccessID _lastChannelSID; + + /** + * Channel table (SID -> channel mapping). + */ + std::map _channels; + + epics::pvData::Mutex _channelsMutex; + + }; + } +} + +#endif /* CODEC_H_ */ diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 9308543..d0fd94b 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -51,6 +51,12 @@ testChannelAccess_SRCS = testChannelAccess channelAccessIFTest testChannelAccess_LIBS += pvData pvAccess pvMB Com TESTS += testChannelAccess +TESTPROD_HOST += testCodec +testCodec_SRCS = testCodec +testCodec_LIBS += pvData pvAccess pvMB Com +TESTS += testCodec + + PROD_HOST += pvget pvget_SRCS += pvget.cpp pvget_LIBS += pvData pvAccess pvMB Com diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index cbee7f3..cdf5f69 100755 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -48,7 +48,7 @@ int ChannelAccessIFTest::runAllTest() { testPlan(152); #endif - test_implementation(); +/* test_implementation(); test_providerName(); test_createEmptyChannel(); @@ -56,10 +56,10 @@ int ChannelAccessIFTest::runAllTest() { test_createChannel(); test_recreateChannelOnDestroyedProvider(); test_findEmptyChannel(); - test_findChannel(); + test_findChannel();*/ test_channel(); - test_channelGetWithInvalidChannelAndRequester(); +/* test_channelGetWithInvalidChannelAndRequester(); test_channelGetNoProcess(); test_channelGetIntProcess(); test_channelGetTestNoConnection(); @@ -95,7 +95,7 @@ int ChannelAccessIFTest::runAllTest() { test_channelArray(); test_channelArray_destroy(); - test_channelArrayTestNoConnection(); + test_channelArrayTestNoConnection();*/ #ifdef ENABLE_STRESS_TESTS test_stressConnectDisconnect(); @@ -415,7 +415,7 @@ void ChannelAccessIFTest::test_channel() { testOk(channel->getConnectionState() == Channel::DESTROYED , "%s: channel connection state DESTROYED ", CURRENT_FUNCTION); - testDiag("%s: destroying the channel yet again", CURRENT_FUNCTION); +/* testDiag("%s: destroying the channel yet again", CURRENT_FUNCTION); channel->destroy(); succStatus = channelReq->waitUntilStateChange(getTimeoutSec()); @@ -432,7 +432,7 @@ void ChannelAccessIFTest::test_channel() { testOk(!channel->isConnected(), "%s: yet again destroyed channel should not be connected ", CURRENT_FUNCTION); testOk(channel->getConnectionState() == Channel::DESTROYED , "%s: yet again destroyed channel connection state DESTROYED ", CURRENT_FUNCTION); - +*/ } diff --git a/testApp/remote/testChannelAccess.cpp b/testApp/remote/testChannelAccess.cpp index 535e1df..d6778c6 100755 --- a/testApp/remote/testChannelAccess.cpp +++ b/testApp/remote/testChannelAccess.cpp @@ -88,6 +88,7 @@ class ChannelAccessIFRemoteTest: public ChannelAccessIFTest { MAIN(testChannelProvider) { + SET_LOG_LEVEL(logLevelTrace); ChannelAccessIFRemoteTest caRemoteTest; return caRemoteTest.runAllTest(); } diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp new file mode 100644 index 0000000..716ca42 --- /dev/null +++ b/testApp/remote/testCodec.cpp @@ -0,0 +1,3195 @@ +/* +* testCodec.cpp +*/ + +#ifdef _WIN32 +#define NOMINMAX +#endif + + +#include +#include +#include +#include + +#include +#include + +using namespace epics::pvData; + +namespace epics { + + namespace pvAccess { + + class PVAMessage { + + public: + + int8_t _version; + int8_t _flags; + int8_t _command; + int32_t _payloadSize; + std::tr1::shared_ptr _payload; + + PVAMessage(int8_t version, + int8_t flags, + int8_t command, + int32_t payloadSize) { + _version = version; + _flags = flags; + _command = command; + _payloadSize = payloadSize; + } + + //memberwise copy constructor/assigment operator + //provided by the compiler + }; + + + class ReadPollOneCallback { + public: + virtual void readPollOne() = 0; + }; + + + class WritePollOneCallback { + public: + virtual void writePollOne() = 0 ; + }; + + + class TestCodec: public AbstractCodec { + + public: + + TestCodec( + std::size_t receiveBufferSize, + std::size_t sendBufferSize, + bool blocking = false): + _closedCount(0), + _invalidDataStreamCount(0), + _scheduleSendCount(0), + _sendCompletedCount(0), + _sendBufferFullCount(0), + _readPollOneCount(0), + _writePollOneCount(0), + _throwExceptionOnSend(false), + _readPayload(false), + _disconnected(false), + _forcePayloadRead(-1), + _readBuffer(new ByteBuffer(receiveBufferSize)), + _writeBuffer(sendBufferSize), + AbstractCodec( + new ByteBuffer(receiveBufferSize), + new ByteBuffer(sendBufferSize), + sendBufferSize/10, + blocking ) { + } + + + void reset() + { + _closedCount = 0; + _invalidDataStreamCount = 0; + _scheduleSendCount = 0; + _sendCompletedCount = 0; + _sendBufferFullCount = 0; + _readPollOneCount = 0; + _writePollOneCount = 0; + _readBuffer->clear(); + _writeBuffer.clear(); + _receivedAppMessages.clear(); + _receivedControlMessages.clear(); + } + + + int read(ByteBuffer *buffer) { + + if (_disconnected) + return -1; + + std::size_t startPos = _readBuffer->getPosition(); + //buffer.put(readBuffer); + //while (buffer.hasRemaining() && readBuffer.hasRemaining()) + // buffer.put(readBuffer.get()); + + std::size_t bufferRemaining = buffer->getRemaining(); + std::size_t readBufferRemaining = + _readBuffer->getRemaining(); + + if (bufferRemaining >= readBufferRemaining) { + + while(_readBuffer->getRemaining() > 0) { + buffer->putByte(_readBuffer->getByte()); + } + + } + else + { + // TODO this could be optimized + for (std::size_t i = 0; i < bufferRemaining; i++) { + buffer->putByte(_readBuffer->getByte()); + } + } + return _readBuffer->getPosition() - startPos; + } + + + int write(ByteBuffer *buffer) { + if (_disconnected) + return -1; // TODO: not by the JavaDoc API spec + + if (_throwExceptionOnSend) + throw io_exception("text IO exception"); + + // we could write remaining int8_ts, but for + //test this is enought + if (buffer->getRemaining() > _writeBuffer.getRemaining()) + return 0; + + std::size_t startPos = buffer->getPosition(); + + while(buffer->getRemaining() > 0) { + _writeBuffer.putByte(buffer->getByte()); + } + + return buffer->getPosition() - startPos; + } + + + void transferToReadBuffer() + { + flushSerializeBuffer(); + _writeBuffer.flip(); + + _readBuffer->clear(); + + while(_writeBuffer.getRemaining() > 0) { + _readBuffer->putByte(_writeBuffer.getByte()); + } + + _readBuffer->flip(); + + _writeBuffer.clear(); + } + + + void addToReadBuffer() + { + flushSerializeBuffer(); + _writeBuffer.flip(); + + while(_writeBuffer.getRemaining() > 0) { + _readBuffer->putByte(_writeBuffer.getByte()); + } + + _readBuffer->flip(); + + _writeBuffer.clear(); + } + + + void processControlMessage() { + // alignment check + if (_socketBuffer->getPosition() % PVA_ALIGNMENT != 0) + throw std::logic_error("message not aligned"); + + _receivedControlMessages.push_back( + PVAMessage(_version, _flags, _command, _payloadSize)); + } + + + void processApplicationMessage() { + // alignment check + if (_socketBuffer->getPosition() % PVA_ALIGNMENT != 0) + throw std::logic_error("message not aligned"); + + PVAMessage caMessage(_version, _flags, + _command, _payloadSize); + + if (_readPayload && _payloadSize > 0) + { + // no fragmentation supported by this implementation + std::size_t toRead = + _forcePayloadRead >= 0 + ? _forcePayloadRead : _payloadSize; + + caMessage._payload.reset(new ByteBuffer(toRead)); + while (toRead > 0) + { + std::size_t partitalRead = + std::min(toRead, MAX_ENSURE_DATA_SIZE); + ensureData(partitalRead); + std::size_t pos = caMessage._payload->getPosition(); + + + while(_socketBuffer->getRemaining() > 0) { + caMessage._payload->putByte(_socketBuffer->getByte()); + } + + std::size_t read = + caMessage._payload->getPosition() - pos; + + toRead -= read; + } + } + _receivedAppMessages.push_back(caMessage); + } + + + void readPollOne() { + _readPollOneCount++; + if (_readPollOneCallback.get() != 0) + _readPollOneCallback->readPollOne(); + } + + + void writePollOne() { + _writePollOneCount++; + if (_writePollOneCallback.get() != 0) + _writePollOneCallback->writePollOne(); + } + + + void endBlockedProcessSendQueue() { + _blockingProcessQueue = false; + _sendQueue.wakeup(); + } + + + void close() { _closedCount++; } + + bool isOpen() { return _closedCount == 0; } + + ReadMode getReadMode() { return _readMode; } + + WriteMode getWriteMode() { return _writeMode;} + + std::auto_ptr & getSendBuffer() + { + return _sendBuffer; + } + + osiSockAddr getLastReadBufferSocketAddress() + { + osiSockAddr tmp = {0}; + return tmp; + } + + void invalidDataStreamHandler() { _invalidDataStreamCount++; } + + void scheduleSend() { _scheduleSendCount++; } + + void sendCompleted() { _sendCompletedCount++; } + + bool terminated() { return false; } + + void cachedSerialize( + const std::tr1::shared_ptr& field, + ByteBuffer* buffer) {field->serialize(buffer, this); } + + bool acquire( + std::tr1::shared_ptr const & client) + { + return false; + } + + bool directSerialize( + ByteBuffer *existingBuffer, + const char* toSerialize, + std::size_t elementCount, + std::size_t elementSize) {return false; } + + bool directDeserialize( + ByteBuffer *existingBuffer, + char* deserializeTo, + std::size_t elementCount, + std::size_t elementSize) { return false; } + + std::tr1::shared_ptr + cachedDeserialize(ByteBuffer* buffer) + { + return std::tr1::shared_ptr(); + } + + void release(pvAccessID clientId) {} + + epics::pvData::String getType() const + { + return epics::pvData::String("TCP"); + } + + const osiSockAddr* getRemoteAddress() const { return 0; } + + epics::pvData::int8 getRevision() const + { + return PVA_PROTOCOL_REVISION; + } + + std::size_t getReceiveBufferSize() const { return 16384; } + + epics::pvData::int16 getPriority() const { return 0; } + + std::size_t getSocketReceiveBufferSize() const + { + return 16384; + } + + void setRemoteRevision(epics::pvData::int8 revision) {} + + void setRemoteTransportSocketReceiveBufferSize( + std::size_t socketReceiveBufferSize) {} + + void setRemoteTransportReceiveBufferSize( + std::size_t remoteTransportReceiveBufferSize) {} + + void changedTransport() {} + + void flushSendQueue() { }; + + bool verify(epics::pvData::int32 timeoutMs) { return true;} + + void verified() {} + + void aliveNotification() {} + + bool isClosed() { return false; } + + + std::size_t _closedCount; + std::size_t _invalidDataStreamCount; + std::size_t _scheduleSendCount; + std::size_t _sendCompletedCount; + std::size_t _sendBufferFullCount; + std::size_t _readPollOneCount; + std::size_t _writePollOneCount; + bool _throwExceptionOnSend; + bool _readPayload; + bool _disconnected; + int _forcePayloadRead; + + std::auto_ptr _readBuffer; + epics::pvData::ByteBuffer _writeBuffer; + + std::vector _receivedAppMessages; + std::vector _receivedControlMessages; + + std::auto_ptr _readPollOneCallback; + std::auto_ptr _writePollOneCallback; + + + protected: + + void sendBufferFull(int tries) { + _sendBufferFullCount++; + _writeOpReady = false; + _writeMode = WAIT_FOR_READY_SIGNAL; + this->writePollOne(); + _writeMode = PROCESS_SEND_QUEUE; + } + }; + + + class CodecTest { + + public: + + int runAllTest() { + testPlan(5885); + testHeaderProcess(); + testInvalidHeaderMagic(); + testInvalidHeaderSegmentedInNormal(); + testInvalidHeaderPayloadNotRead(); + testHeaderSplitRead(); + testNonEmptyPayload(); + testNormalAlignment(); + testSplitAlignment(); + testSegmentedMessage(); + //testSegmentedInvalidInBetweenFlagsMessage(); + testSegmentedMessageAlignment(); + testSegmentedSplitMessage(); + testStartMessage(); + testStartMessageNonEmptyPayload(); + testStartMessageNormalAlignment(); + testStartMessageSegmentedMessage(); + testStartMessageSegmentedMessageAlignment(); + testReadNormalConnectionLoss(); + testSegmentedSplitConnectionLoss(); + testSendConnectionLoss(); + testEnqueueSendRequest(); + testEnqueueSendDirectRequest(); + testSendException(); + testSendHugeMessagePartes(); + testRecipient(); + testClearSendQueue(); + testInvalidArguments(); + testDefaultModes(); + testEnqueueSendRequestExceptionThrown(); + testBlockingProcessQueueTest(); + return testDone(); + } + + virtual ~CodecTest() {} + + protected: + + static const std::size_t DEFAULT_BUFFER_SIZE = 10240; + + private: + + void testHeaderProcess() { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x01); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0x456789AB); + codec._readBuffer->flip(); + + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + + PVAMessage header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(header._flags == 0x01, + "%s: header._flags == 0x01", CURRENT_FUNCTION); + testOk(header._command == 0x23, + "%s: header._command == 0x23", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x456789AB, + "%s: header._payloadSize == 0x456789AB", CURRENT_FUNCTION); + + codec.reset(); + + // two at the time, app and control + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x00); + codec._readBuffer->put((int8_t)0x20); + codec._readBuffer->putInt(0x00000000); + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x81); + codec._readBuffer->put((int8_t)0xEE); + codec._readBuffer->putInt(0xDDCCBBAA); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(0 == codec._invalidDataStreamCount, + "%s: 0 == codec._invalidDataStreamCount", + CURRENT_FUNCTION); + testOk(0 == codec._closedCount, + "%s: 0 == codec._closedCount", CURRENT_FUNCTION); + testOk(1 == codec._receivedControlMessages.size(), + "%s: 1 == codec._receivedControlMessages.size()", + CURRENT_FUNCTION); + testOk(1 == codec._receivedAppMessages.size(), + "%s: 1 == codec._receivedAppMessages.size()", + CURRENT_FUNCTION); + + + // app, no payload + header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x00, + "%s: header._flags == 0x00", CURRENT_FUNCTION); + testOk(header._command == 0x20, + "%s: header._command == 0x20", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x00000000, + "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); + + // control + header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0xEE, + "%s: header._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + "%s: header._payloadSize == 0xDDCCBBAA", + CURRENT_FUNCTION); + } + + + void testInvalidHeaderMagic() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readBuffer->put((int8_t)00); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x01); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0x456789AB); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 1, + "%s: codec._invalidDataStreamCount == 1", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + + + void testInvalidHeaderSegmentedInNormal() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + int8_t invalidFlagsValues[] = + {(int8_t)0x20, (int8_t)(0x30+0x80)}; + + std::size_t size=sizeof(invalidFlagsValues)/sizeof(int8_t); + + for (std::size_t i = 0; i < size; i++) + { + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put(invalidFlagsValues[i]); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 1, + "%s: codec._invalidDataStreamCount == 1", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + } + + + void testInvalidHeaderPayloadNotRead() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x80); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0x456789AB); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 1, + "%s: codec._invalidDataStreamCount == 1", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + } + + + void testHeaderSplitRead() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x01); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + + codec._readBuffer->clear(); + + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0x456789AB); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + + + // app, no payload + PVAMessage header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x01, + "%s: header._flags == 0x01", CURRENT_FUNCTION); + testOk(header._command == 0x23, + "%s: header._command == 0x23", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x456789AB, + "%s: header._payloadSize == 0x456789AB", CURRENT_FUNCTION); + } + + + void testNonEmptyPayload() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + // no misalignment + codec._readPayload = true; + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x80); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(PVA_ALIGNMENT); + for (int i = 0; i < PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)i); + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + // app, no payload + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._payload.get() != 0, + "%s: header._payload.get() != 0", CURRENT_FUNCTION); + + header._payload->flip(); + + testOk( + (std::size_t)PVA_ALIGNMENT == header._payload->getLimit(), + "%s: PVA_ALIGNMENT == header._payload->getLimit()", + CURRENT_FUNCTION); + + } + + + void testNormalAlignment() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x80); + codec._readBuffer->put((int8_t)0x23); + int32_t payloadSize1 = PVA_ALIGNMENT+1; + codec._readBuffer->putInt(payloadSize1); + + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)i); + // align + std::size_t aligned = + AbstractCodec::alignedValue(payloadSize1, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize1; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x80); + codec._readBuffer->put((int8_t)0x45); + + int32_t payloadSize2 = 2*PVA_ALIGNMENT-1; + codec._readBuffer->putInt(payloadSize2); + + for (int32_t i = 0; i < payloadSize2; i++) { + codec._readBuffer->put((int8_t)i); + } + + aligned = + AbstractCodec::alignedValue(payloadSize2, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize2; i < aligned; i++) { + codec._readBuffer->put((int8_t)0xFF); + } + + codec._readBuffer->flip(); + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 2, + "%s: codec._receivedAppMessages.size() == 2", + CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payloadSize == payloadSize1, + "%s: msg._payloadSize == payloadSize1", CURRENT_FUNCTION); + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + + msg._payload->flip(); + + testOk((std::size_t)payloadSize1 == msg._payload->getLimit(), + "%s: payloadSize1, msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + + msg = codec._receivedAppMessages[1]; + + testOk(msg._payloadSize == payloadSize2, + "%s: msg._payloadSize == payloadSize2", CURRENT_FUNCTION); + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSize2 == msg._payload->getLimit(), + "%s: payloadSize2 == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + } + + + class ReadPollOneCallbackForTestSplitAlignment: + public ReadPollOneCallback { + + public: + + ReadPollOneCallbackForTestSplitAlignment( + TestCodec & codec, + int32_t payloadSize1, + int32_t payloadSize2): + _codec(codec), _payloadSize1(payloadSize1), + _payloadSize2(payloadSize2) {} + + + void readPollOne() { + + if (_codec._readPollOneCount == 1) + { + _codec._readBuffer->clear(); + for (int32_t i = _payloadSize1-2; + i < _payloadSize1; i++) { + _codec._readBuffer->put((int8_t)i); + } + + // align + std::size_t aligned = + AbstractCodec::alignedValue( + _payloadSize1, PVA_ALIGNMENT); + + for (std::size_t i = _payloadSize1; i < aligned; i++) + _codec._readBuffer->put((int8_t)0xFF); + + + _codec._readBuffer->put(PVA_MAGIC); + _codec._readBuffer->put(PVA_VERSION); + _codec._readBuffer->put((int8_t)0x80); + _codec._readBuffer->put((int8_t)0x45); + _codec._readBuffer->putInt(_payloadSize2); + + for (int32_t i = 0; i < _payloadSize2; i++) { + _codec._readBuffer->put((int8_t)i); + } + + _codec._readBuffer->flip(); + } + else if (_codec._readPollOneCount == 2) + { + _codec._readBuffer->clear(); + + std::size_t aligned = + AbstractCodec::alignedValue( + _payloadSize2, PVA_ALIGNMENT); + + for (std::size_t i = _payloadSize2; i < aligned; i++) { + _codec._readBuffer->put((int8_t)0xFF); + } + + _codec._readBuffer->flip(); + } + + else + throw std::logic_error("should not happen"); + } + + private: + TestCodec &_codec; + int8_t _payloadSize1; + int8_t _payloadSize2; + }; + + + void testSplitAlignment() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + // "<=" used instead of "==" to suppress compiler warning + if (PVA_ALIGNMENT <= 1) + return; + + codec._readPayload = true; + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x80); + codec._readBuffer->put((int8_t)0x23); + + int32_t payloadSize1 = PVA_ALIGNMENT+1; + codec._readBuffer->putInt(payloadSize1); + + for (int32_t i = 0; i < payloadSize1-2; i++) { + codec._readBuffer->put((int8_t)i); + } + + int32_t payloadSize2 = 2*PVA_ALIGNMENT-1; + + std::auto_ptr + readPollOneCallback( + new ReadPollOneCallbackForTestSplitAlignment + (codec, payloadSize1, payloadSize2)); + + codec._readPollOneCallback = readPollOneCallback; + + codec._readBuffer->flip(); + codec.processRead(); + + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 2, + "%s: codec._receivedAppMessages.size() == 2", + CURRENT_FUNCTION); + testOk(codec._readPollOneCount == 2, + "%s: codec._readPollOneCount == 2", CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payloadSize == payloadSize1, + "%s: msg._payloadSize == payloadSize1", CURRENT_FUNCTION); + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk(payloadSize1 = msg._payload->getLimit(), + "%s: payloadSize1 = msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + + msg = codec._receivedAppMessages[1]; + + testOk(msg._payloadSize == payloadSize2, + "%s: msg._payloadSize == payloadSize2", CURRENT_FUNCTION); + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSize2 == msg._payload->getLimit(), + "%s: payloadSize2 == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + } + + + void testSegmentedMessage() + { + + // no misalignment + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + // 1st + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize1 = PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize1); + + int32_t c = 0; + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)(c++)); + + // 2nd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize2 = 2*PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize2); + + for (int32_t i = 0; i < payloadSize2; i++) + codec._readBuffer->put((int8_t)(c++)); + + // control in between + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x81); + codec._readBuffer->put((int8_t)0xEE); + codec._readBuffer->putInt(0xDDCCBBAA); + + // 3rd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize3 = PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize3); + + for (int32_t i = 0; i < payloadSize3; i++) + codec._readBuffer->put((int8_t)(c++)); + + // 4t (last) + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xA0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize4 = 2*PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize4); + + for (int32_t i = 0; i < payloadSize4; i++) + codec._readBuffer->put((int8_t)(c++)); + + codec._readBuffer->flip(); + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3+payloadSize4; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + testOk(codec._readPollOneCount == 0, + "%s: codec._readPollOneCount == 0", CURRENT_FUNCTION); + + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk( + (std::size_t)payloadSizeSum == msg._payload->getLimit(), + "%s: payloadSizeSum == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getint8_t()", + CURRENT_FUNCTION); + } + + + msg = codec._receivedControlMessages[0]; + + testOk(msg._version == PVA_VERSION, + "%s: msg._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(msg._flags == 0x81, + "%s: msg._flags == 0x81", CURRENT_FUNCTION); + testOk(msg._command == 0xEE, + "%s: msg._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)msg._payloadSize == 0xDDCCBBAA, + "%s: msg._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); + } + + + void testSegmentedInvalidInBetweenFlagsMessage() + { + // no misalignment + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + // 1st + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize1 = PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize1); + + int32_t c = 0; + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)(c++)); + + // 2nd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + // invalid flag, should be 0xB0 + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize2 = 2*PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize2); + + for (int32_t i = 0; i < payloadSize2; i++) + codec._readBuffer->put((int8_t)(c++)); + + // control in between + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x81); + codec._readBuffer->put((int8_t)0xEE); + codec._readBuffer->putInt(0xDDCCBBAA); + + // 3rd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize3 = PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize3); + + for (int32_t i = 0; i < payloadSize3; i++) + codec._readBuffer->put((int8_t)(c++)); + + // 4t (last) + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xA0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize4 = 2*PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize4); + + for (int32_t i = 0; i < payloadSize4; i++) + codec._readBuffer->put((int8_t)(c++)); + + codec._readBuffer->flip(); + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3+payloadSize4; + codec._forcePayloadRead = payloadSizeSum; + + try { + codec.processRead(); + testFail( + "%s: invalid_data_stream_exception, but not reported", + CURRENT_FUNCTION); + } catch(invalid_data_stream_exception &) { + testOk(true, "%s: invalid_data_stream_exception reported", + CURRENT_FUNCTION); + } + + testOk(codec._invalidDataStreamCount == 1, + "%s: codec._invalidDataStreamCount == 1", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + + + void testSegmentedMessageAlignment() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + // 1st + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize1 = PVA_ALIGNMENT+1; + codec._readBuffer->putInt(payloadSize1); + + int32_t c = 0; + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)(c++)); + + std::size_t aligned = + AbstractCodec::alignedValue(payloadSize1, PVA_ALIGNMENT); + for (std::size_t i = payloadSize1; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + + // 2nd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize2 = 2*PVA_ALIGNMENT-1; + int32_t payloadSize2Real = + payloadSize2 + payloadSize1 % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize2Real); + + // pre-message padding + for (int32_t i = 0; i < payloadSize1 % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize2; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize2Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize2Real; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 3rd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize3 = PVA_ALIGNMENT+2; + int32_t payloadSize3Real = + payloadSize3 + payloadSize2Real % PVA_ALIGNMENT; + codec._readBuffer->putInt(payloadSize3Real); + + // pre-message padding required + for (int32_t i = 0; + i < payloadSize2Real % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize3; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize3Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize3Real; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 4t (last) + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xA0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize4 = 2*PVA_ALIGNMENT+3; + int32_t payloadSize4Real = + payloadSize4 + payloadSize3Real % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize4Real); + + // pre-message padding required + for (int32_t i = 0; + i < payloadSize3Real % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize4; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize4Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize4Real; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + codec._readBuffer->flip(); + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3+payloadSize4; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + testOk(codec._readPollOneCount == 0, + "%s: codec._readPollOneCount == 0", CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk( + (std::size_t)payloadSizeSum == msg._payload->getLimit(), + "%s: payloadSizeSum == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + + + class ReadPollOneCallbackForTestSegmentedSplitMessage: + public ReadPollOneCallback { + + public: + + ReadPollOneCallbackForTestSegmentedSplitMessage( + TestCodec & codec, + int32_t realReadBufferEnd): + _codec(codec), _realReadBufferEnd(realReadBufferEnd) {} + + + void readPollOne() { + if (_codec._readPollOneCount == 1) + { + _codec._readBuffer->setLimit(_realReadBufferEnd); + } + else + throw std::logic_error("should not happen"); + } + + private: + TestCodec &_codec; + std::size_t _realReadBufferEnd; + }; + + + void testSegmentedSplitMessage() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + for (int32_t firstMessagePayloadSize = 1; // cannot be zero + firstMessagePayloadSize <= 3*PVA_ALIGNMENT; + firstMessagePayloadSize++) + { + for (int32_t secondMessagePayloadSize = 0; + secondMessagePayloadSize <= 2*PVA_ALIGNMENT; + secondMessagePayloadSize++) + { + // cannot be zero + for (int32_t thirdMessagePayloadSize = 1; + thirdMessagePayloadSize <= 2*PVA_ALIGNMENT; + thirdMessagePayloadSize++) + { + std::size_t splitAt = 1; + while (true) + { + TestCodec codec(DEFAULT_BUFFER_SIZE, + DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + // 1st + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize1 = firstMessagePayloadSize; + codec._readBuffer->putInt(payloadSize1); + + int32_t c = 0; + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)(c++)); + + std::size_t aligned = + AbstractCodec::alignedValue( + payloadSize1, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize1; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 2nd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize2 = secondMessagePayloadSize; + int payloadSize2Real = + payloadSize2 + payloadSize1 % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize2Real); + + // pre-message padding + for (int32_t i = 0; + i < payloadSize1 % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize2; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize2Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize2Real; + i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 3rd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xA0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize3 = thirdMessagePayloadSize; + int32_t payloadSize3Real = + payloadSize3 + payloadSize2Real % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize3Real); + + // pre-message padding required + for (int32_t i = 0; + i < payloadSize2Real % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize3; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize3Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize3Real; + i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + codec._readBuffer->flip(); + + std::size_t realReadBufferEnd = + codec._readBuffer->getLimit(); + + if (splitAt++ == realReadBufferEnd) + break; + + codec._readBuffer->setLimit(splitAt); + + std::auto_ptr + readPollOneCallback( + new ReadPollOneCallbackForTestSegmentedSplitMessage + (codec, realReadBufferEnd)); + + codec._readPollOneCallback = readPollOneCallback; + + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + while (codec._invalidDataStreamCount == 0 && + codec._readBuffer->getPosition() != + realReadBufferEnd) + { + codec._readPollOneCount++; + codec._readPollOneCallback->readPollOne(); + codec.processRead(); + } + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + + if (splitAt == realReadBufferEnd) { + testOk(0 == codec._readPollOneCount, + "%s: 0 == codec._readPollOneCount", + CURRENT_FUNCTION); + } + else { + testOk(1 == codec._readPollOneCount, + "%s: 1 == codec._readPollOneCount", + CURRENT_FUNCTION); + } + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSizeSum == + msg._payload->getLimit(), + "%s: payloadSizeSum == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + } + } + } + } + } + + + void testStartMessage() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec.putControlMessage((int8_t)0x23, 0x456789AB); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + + + PVAMessage header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0x23, + "%s: header._command == 0x23", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x456789AB, + "%s: header._payloadSize == 0x456789AB", CURRENT_FUNCTION); + + codec.reset(); + + // two at the time, app and control + codec.setByteOrder(EPICS_ENDIAN_LITTLE); + codec.startMessage((int8_t)0x20, 0x00000000); + codec.endMessage(); + + codec.setByteOrder(EPICS_ENDIAN_BIG); + codec.putControlMessage((int8_t)0xEE, 0xDDCCBBAA); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + // app, no payload + header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x00, + "%s: header._flags == 0x00", CURRENT_FUNCTION); + testOk(header._command == 0x20, + "%s: header._command == 0x20", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x00000000, + "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); + + // control + header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0xEE, + "%s: header._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); + } + + + void testStartMessageNonEmptyPayload() + { + // no misalignment + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + codec.startMessage((int8_t)0x23, 0); + + codec.ensureBuffer((std::size_t)PVA_ALIGNMENT); + for (int32_t i = 0; i < PVA_ALIGNMENT; i++) + codec.getSendBuffer()->put((int8_t)i); + + codec.endMessage(); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + // app, no payload + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._payload.get() != 0, + "%s: header._payload.get() != 0", CURRENT_FUNCTION); + + header._payload->flip(); + + testOk((std::size_t)PVA_ALIGNMENT == + header._payload->getLimit(), + "%s: PVA_ALIGNMENT == header._payload->getLimit()", + CURRENT_FUNCTION); + } + + + void testStartMessageNormalAlignment() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + codec.startMessage((int8_t)0x23, 0); + int32_t payloadSize1 = PVA_ALIGNMENT+1; + codec.ensureBuffer(payloadSize1); + + for (int32_t i = 0; i < payloadSize1; i++) + codec.getSendBuffer()->put((int8_t)i); + + codec.endMessage(); + + codec.startMessage((int8_t)0x45, 0); + int32_t payloadSize2 = 2*PVA_ALIGNMENT-1; + codec.ensureBuffer(payloadSize2); + + for (int32_t i = 0; i < payloadSize2; i++) + codec.getSendBuffer()->put((int8_t)i); + + codec.endMessage(); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 2, + "%s: codec._receivedAppMessages.size() == 2", + CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payloadSize == payloadSize1, + "%s: msg._payloadSize == payloadSize1", CURRENT_FUNCTION); + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSize1 == + msg._payload->getLimit(), + "%s: payloadSize1 == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + + msg = codec._receivedAppMessages[1]; + + testOk(msg._payloadSize == payloadSize2, + "%s: msg._payloadSize == payloadSize2", CURRENT_FUNCTION); + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSize2 == + msg._payload->getLimit(), + "%s: payloadSize2 == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + + + void testStartMessageSegmentedMessage() + { + // no misalignment + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + codec.startMessage((int8_t)0x01, 0); + + int32_t c = 0; + + int32_t payloadSize1 = PVA_ALIGNMENT; + for (int32_t i = 0; i < payloadSize1; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize2 = 2*PVA_ALIGNMENT; + for (int32_t i = 0; i < payloadSize2; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize3 = PVA_ALIGNMENT; + for (int32_t i = 0; i < payloadSize3; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize4 = 2*PVA_ALIGNMENT; + for (int32_t i = 0; i < payloadSize4; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.endMessage(); + + codec.transferToReadBuffer(); + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3+payloadSize4; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + testOk(codec._readPollOneCount == 0, + "%s: codec._readPollOneCount == 0", CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSizeSum == + msg._payload->getLimit(), + "%s: payloadSizeSum == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) + testOk((int8_t)i == msg._payload->getByte(), + "%s: (int8_t)i == msg._payload->getByte()", + CURRENT_FUNCTION); + } + + + void testStartMessageSegmentedMessageAlignment() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + for (int32_t firstMessagePayloadSize = 1; // cannot be zero + firstMessagePayloadSize <= 3*PVA_ALIGNMENT; + firstMessagePayloadSize++) + { + for (int32_t secondMessagePayloadSize = 0; + secondMessagePayloadSize <= 2*PVA_ALIGNMENT; + secondMessagePayloadSize++) + { + // cannot be zero + for (int32_t thirdMessagePayloadSize = 1; + thirdMessagePayloadSize <= 2*PVA_ALIGNMENT; + thirdMessagePayloadSize++) + { + // cannot be zero + for (int32_t fourthMessagePayloadSize = 1; + fourthMessagePayloadSize <= 2*PVA_ALIGNMENT; + fourthMessagePayloadSize++) + { + TestCodec codec(DEFAULT_BUFFER_SIZE, + DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + codec.startMessage((int8_t)0x01, 0); + + int32_t c = 0; + + int32_t payloadSize1 = firstMessagePayloadSize; + for (int32_t i = 0; i < payloadSize1; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize2 = secondMessagePayloadSize; + for (int32_t i = 0; i < payloadSize2; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize3 = thirdMessagePayloadSize; + for (int32_t i = 0; i < payloadSize3; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.flush(false); + + int32_t payloadSize4 = fourthMessagePayloadSize; + for (int32_t i = 0; i < payloadSize4; i++) + codec.getSendBuffer()->put((int8_t)(c++)); + + codec.endMessage(); + + codec.transferToReadBuffer(); + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3 + +payloadSize4; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + testOk(codec._readPollOneCount == 0, + "%s: codec._readPollOneCount == 0", + CURRENT_FUNCTION); + + PVAMessage msg = codec._receivedAppMessages[0]; + + testOk(msg._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + msg._payload->flip(); + + testOk((std::size_t)payloadSizeSum == + msg._payload->getLimit(), + "%s: payloadSizeSum == msg._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < msg._payloadSize; i++) { + if ((int8_t)i != msg._payload->getByte()) { + testFail( + "%s: (int8_t)%d == msg._payload->getByte()", + CURRENT_FUNCTION, (int8_t)i); + } + } + } + } + } + } + } + + + void testReadNormalConnectionLoss() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + codec._disconnected = true; + + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x01); + codec._readBuffer->put((int8_t)0x23); + codec._readBuffer->putInt(0x456789AB); + + codec._readBuffer->flip(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 1, + "%s: codec._closedCount == 1", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + + + class ReadPollOneCallbackForTestSegmentedSplitConnectionLoss: + public ReadPollOneCallback { + + public: + + ReadPollOneCallbackForTestSegmentedSplitConnectionLoss( + TestCodec & codec): _codec(codec) {} + + + void readPollOne() { + if (_codec._readPollOneCount == 1) + { + _codec._disconnected = true; + } + else + throw std::logic_error("should not happen"); + } + + private: + TestCodec &_codec; + }; + + + void testSegmentedSplitConnectionLoss() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + + for (int32_t firstMessagePayloadSize = 1; // cannot be zero + firstMessagePayloadSize <= 3*PVA_ALIGNMENT; + firstMessagePayloadSize++) + { + for (int32_t secondMessagePayloadSize = 0; + secondMessagePayloadSize <= 2*PVA_ALIGNMENT; + secondMessagePayloadSize++) + { + // cannot be zero + for (int32_t thirdMessagePayloadSize = 1; + thirdMessagePayloadSize <= 2*PVA_ALIGNMENT; + thirdMessagePayloadSize++) + { + std::size_t splitAt = 1; + + while (true) + { + TestCodec codec(DEFAULT_BUFFER_SIZE, + DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + + // 1st + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0x90); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize1 = firstMessagePayloadSize; + codec._readBuffer->putInt(payloadSize1); + + int32_t c = 0; + for (int32_t i = 0; i < payloadSize1; i++) + codec._readBuffer->put((int8_t)(c++)); + + std::size_t aligned = + AbstractCodec::alignedValue( + payloadSize1, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize1; i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 2nd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xB0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize2 = secondMessagePayloadSize; + int32_t payloadSize2Real = + payloadSize2 + payloadSize1 % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize2Real); + + // pre-message padding + for (int32_t i = 0; + i < payloadSize1 % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize2; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize2Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize2Real; + i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + // 3rd + codec._readBuffer->put(PVA_MAGIC); + codec._readBuffer->put(PVA_VERSION); + codec._readBuffer->put((int8_t)0xA0); + codec._readBuffer->put((int8_t)0x01); + + int32_t payloadSize3 = thirdMessagePayloadSize; + int32_t payloadSize3Real = + payloadSize3 + payloadSize2Real % PVA_ALIGNMENT; + + codec._readBuffer->putInt(payloadSize3Real); + + // pre-message padding required + for (int32_t i = 0; + i < payloadSize2Real % PVA_ALIGNMENT; i++) + codec._readBuffer->put((int8_t)0xEE); + + for (int32_t i = 0; i < payloadSize3; i++) + codec._readBuffer->put((int8_t)(c++)); + + aligned = + AbstractCodec::alignedValue( + payloadSize3Real, PVA_ALIGNMENT); + + for (std::size_t i = payloadSize3Real; + i < aligned; i++) + codec._readBuffer->put((int8_t)0xFF); + + codec._readBuffer->flip(); + + std::size_t realReadBufferEnd = + codec._readBuffer->getLimit(); + + if (splitAt++ == realReadBufferEnd-1) + break; + + codec._readBuffer->setLimit(splitAt); + + std::auto_ptr + readPollOneCallback( new + ReadPollOneCallbackForTestSegmentedSplitConnectionLoss + (codec)); + + + codec._readPollOneCallback = readPollOneCallback; + + int32_t payloadSizeSum = + payloadSize1+payloadSize2+payloadSize3; + + codec._forcePayloadRead = payloadSizeSum; + + codec.processRead(); + + while (codec._closedCount == 0 && + codec._invalidDataStreamCount == 0 && + codec._readBuffer->getPosition() != + realReadBufferEnd) + { + codec._readPollOneCount++; + codec._readPollOneCallback->readPollOne(); + codec.processRead(); + } + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 1, + "%s: codec._closedCount == 1", CURRENT_FUNCTION); + } + } + } + } + } + + + void testSendConnectionLoss() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + codec._disconnected = true; + + codec.putControlMessage((int8_t)0x23, 0x456789AB); + + try + { + codec.transferToReadBuffer(); + testFail("%s: connection lost, but not reported", + CURRENT_FUNCTION); + } + catch (connection_closed_exception & ) { + testOk(true, "%s: connection closed exception expected", + CURRENT_FUNCTION); + } + + testOk(codec._closedCount == 1, + "%s: codec._closedCount == 1", CURRENT_FUNCTION); + } + + + class TransportSenderForTestEnqueueSendRequest: + public TransportSender { + public: + + TransportSenderForTestEnqueueSendRequest( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.startMessage((int8_t)0x20, 0x00000000); + _codec.endMessage(); + } + + private: + TestCodec &_codec; + }; + + + class TransportSender2ForTestEnqueueSendRequest: + public TransportSender { + public: + + TransportSender2ForTestEnqueueSendRequest( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0xEE, 0xDDCCBBAA); + } + + private: + TestCodec &_codec; + }; + + + void testEnqueueSendRequest() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestEnqueueSendRequest(codec)); + + std::tr1::shared_ptr sender2 = + std::tr1::shared_ptr( + new TransportSender2ForTestEnqueueSendRequest(codec)); + + // process + codec.enqueueSendRequest(sender); + codec.enqueueSendRequest(sender2); + codec.processSendQueue(); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + + // app, no payload + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x80, + "%s: header._flags == 0x80", CURRENT_FUNCTION); + testOk(header._command == 0x20, + "%s: header._command == 0x20", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x00000000, + "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); + + // control + header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0xEE, + "%s: header._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); + } + + + class TransportSenderForTestEnqueueSendDirectRequest: + public TransportSender { + public: + + TransportSenderForTestEnqueueSendDirectRequest( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.startMessage((int8_t)0x20, 0x00000000); + _codec.endMessage(); + } + + private: + TestCodec &_codec; + }; + + + class TransportSender2ForTestEnqueueSendDirectRequest: + public TransportSender { + public: + + TransportSender2ForTestEnqueueSendDirectRequest( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0xEE, + 0xDDCCBBAA); + } + + private: + TestCodec &_codec; + }; + + + void testEnqueueSendDirectRequest() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestEnqueueSendDirectRequest(codec)); + std::tr1::shared_ptr sender2 = + std::tr1::shared_ptr( + new TransportSender2ForTestEnqueueSendDirectRequest + (codec)); + + + // thread not right + codec.enqueueSendRequest(sender, PVA_MESSAGE_HEADER_SIZE); + + + testOk(1 == codec._scheduleSendCount, + "%s: 1 == codec._scheduleSendCount", CURRENT_FUNCTION); + testOk(0 == codec._receivedControlMessages.size(), + "%s: 0 == codec._receivedControlMessages.size()", + CURRENT_FUNCTION); + testOk(0 == codec._receivedAppMessages.size(), + "%s: 0 == codec._receivedAppMessages.size()", + CURRENT_FUNCTION); + + codec.setSenderThread(); + + // not empty queue + codec.enqueueSendRequest(sender2, PVA_MESSAGE_HEADER_SIZE); + + testOk(2 == codec._scheduleSendCount, + "%s: 2 == codec._scheduleSendCount", CURRENT_FUNCTION); + testOk(0 == codec._receivedControlMessages.size(), + "%s: 0 == codec._receivedControlMessages.size()", + CURRENT_FUNCTION); + testOk(0 == codec._receivedAppMessages.size(), + "%s: 0 == codec._receivedAppMessages.size()", + CURRENT_FUNCTION); + + // send will be triggered after last + //was processed + testOk(0 == codec._sendCompletedCount, + "%s: 0 == codec._sendCompletedCount", CURRENT_FUNCTION); + + codec.processSendQueue(); + + testOk(1 == codec._sendCompletedCount, + "%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION); + + codec.transferToReadBuffer(); + + codec.processRead(); + + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 1, + "%s: codec._receivedControlMessages.size() == 1 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + // app, no payload + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x80, + "%s: header._flags == 0x80", CURRENT_FUNCTION); + testOk(header._command == 0x20, + "%s: header._command == 0x20", CURRENT_FUNCTION); + testOk(header._payloadSize == 0x00000000, + "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); + + + // control + header = codec._receivedControlMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0xEE, + "%s: header._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); + + + testOk(0 == codec.getSendBuffer()->getPosition(), + "%s: 0 == codec.getSendBuffer()->getPosition()", + CURRENT_FUNCTION); + + // now queue is empty and thread is right + codec.enqueueSendRequest(sender2, PVA_MESSAGE_HEADER_SIZE); + + testOk((std::size_t)PVA_MESSAGE_HEADER_SIZE == + codec.getSendBuffer()->getPosition(), + "%s: PVA_MESSAGE_HEADER_SIZE == " + "codec.getSendBuffer()->getPosition()", + CURRENT_FUNCTION); + testOk(3 == codec._scheduleSendCount, + "%s: 3 == codec._scheduleSendCount", CURRENT_FUNCTION); + testOk(1 == codec._sendCompletedCount, + "%s: 1 == codec._sendCompletedCount", CURRENT_FUNCTION); + + codec.processWrite(); + + testOk(2 == codec._sendCompletedCount, + "%s: 2 == codec._sendCompletedCount", CURRENT_FUNCTION); + + + codec.transferToReadBuffer(); + codec.processRead(); + + testOk(codec._receivedControlMessages.size() == 2, + "%s: codec._receivedControlMessages.size() == 2 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + header = codec._receivedControlMessages[1]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x81, + "%s: header._flags == 0x81", CURRENT_FUNCTION); + testOk(header._command == 0xEE, + "%s: header._command == 0xEE", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); + } + + + + class TransportSenderForTestSendPerPartes: + public TransportSender { + public: + + TransportSenderForTestSendPerPartes( + TestCodec & codec, std::size_t bytesToSend): + _codec(codec), _bytesToSent(bytesToSend) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.startMessage((int8_t)0x12, _bytesToSent); + + for (std::size_t i = 0; i < _bytesToSent; i++) + _codec.getSendBuffer()->put((int8_t)i); + + _codec.endMessage(); + } + + private: + TestCodec &_codec; + std::size_t _bytesToSent; + }; + + + void testSendPerPartes() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + std::size_t bytesToSent = + DEFAULT_BUFFER_SIZE - 2*PVA_MESSAGE_HEADER_SIZE; + + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestSendPerPartes( + codec, bytesToSent)); + + codec._readPayload = true; + + // process + codec.enqueueSendRequest(sender); + codec.processSendQueue(); + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + + // app + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == 0x80, + "%s: header._flags == 0x80", CURRENT_FUNCTION); + testOk(header._command == 0x12, + "%s: header._command == 0x12", CURRENT_FUNCTION); + testOk((std::size_t)header._payloadSize == bytesToSent, + "%s: header._payloadSize == bytesToSent", + CURRENT_FUNCTION); + + + testOk(header._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + header._payload->flip(); + + testOk(bytesToSent == header._payload->getLimit(), + "%s: bytesToSent == header._payload->getLimit()", + CURRENT_FUNCTION); + + for (int32_t i = 0; i < header._payloadSize; i++) { + if ((int8_t)i != header._payload->getByte()) { + testFail("%s: (int8_t)%d == header._payload->getByte()", + CURRENT_FUNCTION, i); + } + } + } + + + class TransportSenderForTestSendException: + public TransportSender { + public: + + TransportSenderForTestSendException( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0x01, 0x00112233); + } + + private: + TestCodec &_codec; + }; + + + void testSendException() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestSendException(codec)); + + codec._throwExceptionOnSend = true; + + // process + codec.enqueueSendRequest(sender); + + try + { + codec.processSendQueue(); + testFail("%s: ConnectionClosedException expected", + CURRENT_FUNCTION); + } catch (connection_closed_exception &) { + // OK + } + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 1, + "%s: codec._closedCount == 1", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + + + class TransportSenderForTestSendHugeMessagePartes: + public TransportSender { + public: + + TransportSenderForTestSendHugeMessagePartes( + TestCodec & codec, std::size_t bytesToSend): + _codec(codec), _bytesToSent(bytesToSend) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.startMessage((int8_t)0x12, 0); + std::size_t toSend = _bytesToSent; + int32_t c = 0; + while (toSend > 0) + { + std::size_t sendNow = std::min(toSend, + AbstractCodec::MAX_ENSURE_BUFFER_SIZE); + + _codec.ensureBuffer(sendNow); + for (std::size_t i = 0; i < sendNow; i++) + _codec.getSendBuffer()->put((int8_t)(c++)); + toSend -= sendNow; + } + _codec.endMessage(); + } + + private: + TestCodec &_codec; + std::size_t _bytesToSent; + }; + + + class WritePollOneCallbackForTestSendHugeMessagePartes: + public WritePollOneCallback { + public: + + WritePollOneCallbackForTestSendHugeMessagePartes( + TestCodec &codec): _codec(codec) {} + + void writePollOne() { + _codec.processWrite(); // this should return immediately + + // now we fake reading + _codec._writeBuffer.flip(); + + // in this test we made sure readBuffer is big enough + while(_codec._writeBuffer.getRemaining() > 0) + { + _codec._readBuffer->putByte( + _codec._writeBuffer.getByte()); + } + + _codec._writeBuffer.clear(); + } + private: + TestCodec & _codec; + }; + + + void testSendHugeMessagePartes() + { + + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + std::size_t bytesToSent = 10*DEFAULT_BUFFER_SIZE+1; + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + codec._readPayload = true; + codec._readBuffer.reset( + new ByteBuffer(11*DEFAULT_BUFFER_SIZE)); + + std::auto_ptr + writePollOneCallback( + new WritePollOneCallbackForTestSendHugeMessagePartes + (codec)); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestSendHugeMessagePartes( + codec, bytesToSent)); + + codec._writePollOneCallback = writePollOneCallback; + + + // process + codec.enqueueSendRequest(sender); + codec.processSendQueue(); + + codec.addToReadBuffer(); + + codec._forcePayloadRead = bytesToSent; + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 0, + "%s: codec._closedCount == 0", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 1, + "%s: codec._receivedAppMessages.size() == 1", + CURRENT_FUNCTION); + + // app + PVAMessage header = codec._receivedAppMessages[0]; + + testOk(header._version == PVA_VERSION, + "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); + testOk(header._flags == (int8_t)(0x80 | 0x10), + "%s: header._flags == (int8_t)(0x80 | 0x10)", + CURRENT_FUNCTION); + testOk(header._command == 0x12, + "%s: header._command == 0x12", CURRENT_FUNCTION); + + testOk(header._payload.get() != 0, + "%s: msg._payload.get() != 0", CURRENT_FUNCTION); + + header._payload->flip(); + + testOk(bytesToSent == header._payload->getLimit(), + "%s: bytesToSent == header._payload->getLimit()", + CURRENT_FUNCTION); + + + for (int32_t i = 0; i < header._payloadSize; i++) { + if ((int8_t)i != header._payload->getByte()) { + testFail("%s: (int8_t)%d == header._payload->getByte()", + CURRENT_FUNCTION, i); + } + } + + } + + + void testRecipient() + { + // nothing to test, depends on implementation + testSkip(1, " testRecipient()"); + } + + + class TransportSenderForTestClearSendQueue: + public TransportSender { + public: + + TransportSenderForTestClearSendQueue( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.startMessage((int8_t)0x20, 0x00000000); + _codec.endMessage(); + } + + private: + TestCodec &_codec; + }; + + + class TransportSender2ForTestClearSendQueue: + public TransportSender { + public: + + TransportSender2ForTestClearSendQueue( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0xEE, 0xDDCCBBAA); + } + + private: + TestCodec &_codec; + }; + + + void testClearSendQueue() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestClearSendQueue(codec)); + + std::tr1::shared_ptr sender2 = + std::tr1::shared_ptr( + new TransportSender2ForTestClearSendQueue(codec)); + + + codec.enqueueSendRequest(sender); + codec.enqueueSendRequest(sender2); + + codec.clearSendQueue(); + + codec.processSendQueue(); + + testOk(0 == codec.getSendBuffer()->getPosition(), + "%s: 0 == codec.getSendBuffer()->getPosition()", + CURRENT_FUNCTION); + testOk(0 == codec._writeBuffer.getPosition(), + "%s: 0 == codec._writeBuffer.getPosition()", + CURRENT_FUNCTION); + } + + + void testInvalidArguments() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + try + { + // too small + TestCodec codec(1,DEFAULT_BUFFER_SIZE); + testFail("%s: too small buffer accepted", + CURRENT_FUNCTION); + } catch (std::exception &) { + // OK + } + + try + { + // too small + TestCodec codec(DEFAULT_BUFFER_SIZE,1); + testFail("%s: too small buffer accepted", + CURRENT_FUNCTION); + } catch (std::exception &) { + // OK + } + + if (PVA_ALIGNMENT > 1) + { + try + { + // non aligned + TestCodec codec(2*AbstractCodec::MAX_ENSURE_SIZE+1, + DEFAULT_BUFFER_SIZE); + + testFail("%s: non-aligned buffer size accepted", + CURRENT_FUNCTION); + + } catch (std::exception &) { + // OK + } + + try + { + // non aligned + TestCodec codec(DEFAULT_BUFFER_SIZE, + 2*AbstractCodec::MAX_ENSURE_SIZE+1); + + testFail("%s: non-aligned buffer size accepted", + CURRENT_FUNCTION); + + } catch (std::exception &) { + // OK + } + } + + TestCodec codec(DEFAULT_BUFFER_SIZE, + DEFAULT_BUFFER_SIZE); + + try + { + codec.ensureBuffer(DEFAULT_BUFFER_SIZE+1); + testFail("%s: too big size accepted", + CURRENT_FUNCTION); + } catch (std::exception &) { + // OK + } + + try + { + codec.ensureData(AbstractCodec::MAX_ENSURE_DATA_SIZE+1); + testFail("%s: too big size accepted", CURRENT_FUNCTION); + } catch (std::exception &) { + // OK + } + } + + + void testDefaultModes() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + testOk(NORMAL== codec.getReadMode(), + "%s: NORMAL== codec.getReadMode()", CURRENT_FUNCTION); + testOk(PROCESS_SEND_QUEUE == codec.getWriteMode(), + "%s: PROCESS_SEND_QUEUE == codec.getWriteMode()", + CURRENT_FUNCTION); + } + + + class TransportSenderForTestEnqueueSendRequestExceptionThrown: + public TransportSender { + public: + + TransportSenderForTestEnqueueSendRequestExceptionThrown( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + throw connection_closed_exception( + "expected test exception"); + } + + private: + TestCodec &_codec; + }; + + + class TransportSender2ForTestEnqueueSendRequestExceptionThrown: + public TransportSender { + public: + + TransportSender2ForTestEnqueueSendRequestExceptionThrown( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0xEE, 0xDDCCBBAA); + } + + private: + TestCodec &_codec; + }; + + + void testEnqueueSendRequestExceptionThrown() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + TestCodec codec(DEFAULT_BUFFER_SIZE,DEFAULT_BUFFER_SIZE); + + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new + TransportSenderForTestEnqueueSendRequestExceptionThrown + (codec)); + + std::tr1::shared_ptr sender2 = + std::tr1::shared_ptr(new + TransportSender2ForTestEnqueueSendRequestExceptionThrown( + codec)); + + + // process + codec.enqueueSendRequest(sender); + codec.enqueueSendRequest(sender2); + + try + { + codec.processSendQueue(); + testFail("%s: ConnectionClosedException expected", + CURRENT_FUNCTION); + + } catch (connection_closed_exception &) { + // OK + } + + codec.transferToReadBuffer(); + + codec.processRead(); + + testOk(codec._invalidDataStreamCount == 0, + "%s: codec._invalidDataStreamCount == 0", + CURRENT_FUNCTION); + testOk(codec._closedCount == 1, + "%s: codec._closedCount == 1", CURRENT_FUNCTION); + testOk(codec._receivedControlMessages.size() == 0, + "%s: codec._receivedControlMessages.size() == 0 ", + CURRENT_FUNCTION); + testOk(codec._receivedAppMessages.size() == 0, + "%s: codec._receivedAppMessages.size() == 0", + CURRENT_FUNCTION); + } + + + class TransportSenderForTestBlockingProcessQueueTest: + public TransportSender { + public: + + TransportSenderForTestBlockingProcessQueueTest( + TestCodec & codec): _codec(codec) {} + + void unlock() { + } + + void lock() { + } + + void send(epics::pvData::ByteBuffer* buffer, + TransportSendControl* control) + { + _codec.putControlMessage((int8_t)0x01, 0x00112233); + } + + private: + TestCodec &_codec; + }; + + + class ValueHolder { + public: + ValueHolder( + TestCodec &testCodec, + AtomicValue &processTreadExited): + _testCodec(testCodec), + _processTreadExited(processTreadExited) {} + + TestCodec &_testCodec; + AtomicValue & _processTreadExited; + }; + + + void testBlockingProcessQueueTest() + { + testDiag("BEGIN TEST %s:", CURRENT_FUNCTION); + + TestCodec codec(DEFAULT_BUFFER_SIZE, + DEFAULT_BUFFER_SIZE, true); + + _processTreadExited.getAndSet(false); + std::tr1::shared_ptr sender = + std::tr1::shared_ptr( + new TransportSenderForTestBlockingProcessQueueTest(codec)); + + ValueHolder valueHolder(codec, _processTreadExited); + + epicsThreadCreate( + "testBlockingProcessQueueTest-processThread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize( + epicsThreadStackMedium), + CodecTest::blockingProcessQueueThread, + &valueHolder); + + epicsThreadSleep(3); + + testOk(_processTreadExited.get() == false, + "%s: _processTreadExited.get() == false", + CURRENT_FUNCTION); + + // let's put something into it + + codec.enqueueSendRequest(sender); + + epicsThreadSleep(1); + + testOk((std::size_t)PVA_MESSAGE_HEADER_SIZE == + codec._writeBuffer.getPosition(), + "%s: PVA_MESSAGE_HEADER_SIZE == " + "codec._writeBuffer.getPosition()", + CURRENT_FUNCTION); + + codec.endBlockedProcessSendQueue(); + + epicsThreadSleep(1); + + testOk(_processTreadExited.get() == true, + "%s: _processTreadExited.get() == true", CURRENT_FUNCTION); + } + + private: + + void static blockingProcessQueueThread(void *param) { + ValueHolder *valueHolder = static_cast(param); + // this should block + valueHolder->_testCodec.processSendQueue(); + valueHolder->_processTreadExited.getAndSet(true); + } + + AtomicValue _processTreadExited; + }; + } +} + + +using namespace epics::pvAccess; + +MAIN(testCodec) +{ + CodecTest codecTest; + return codecTest.runAllTest(); +}