diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index 80a5a9c..6165f82 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -39,8 +39,8 @@ namespace epics { const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; AbstractCodec::AbstractCodec( - std::tr1::shared_ptr receiveBuffer, - std::tr1::shared_ptr sendBuffer, + std::tr1::shared_ptr const & receiveBuffer, + std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize, bool blockingProcessQueue): //PROTECTED @@ -56,7 +56,8 @@ namespace epics { _maxSendPayloadSize(0), _lastMessageStartPosition(0),_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), - _byteOrderFlag(0x80),_socketSendBufferSize(0) + _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00), + _socketSendBufferSize(0) { if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE) throw std::invalid_argument( @@ -109,7 +110,7 @@ namespace epics { processReadSegmented(); break; case SPLIT: - throw std::logic_error("SPLIT NOT SUPPORTED"); + throw std::logic_error("ReadMode == SPLIT not supported"); } } @@ -197,65 +198,16 @@ namespace epics { { // handle response processApplicationMessage(); - //TODO: MATEJ CHECK - throw simulate_finally_exception("go to finally block"); + + postProcessApplicationMessage(); } catch(...) //finally { - if (!isOpen()) - return; + if (!isOpen()) + return; - // can be closed by now - // isOpen() should be efficiently implemented - while (true) - //while (isOpen()) - { - // set position as whole message was read - //(in case code haven't done so) - std::size_t newPosition = - alignedValue( - _storedPosition + _storedPayloadSize, PVA_ALIGNMENT); - - // aligned buffer size ensures that there is enough space - //in buffer, - // however data might not be fully read - - // discard the rest of the packet - if (newPosition > _storedLimit) - { - // processApplicationMessage() did not read up - //quite some buffer - - // we only handle unused alignment bytes - int bytesNotRead = - newPosition - _socketBuffer->getPosition(); - - if (bytesNotRead < PVA_ALIGNMENT) - { - // make alignment bytes as real payload to enable SPLIT - // no end-of-socket or segmented scenario can happen - // due to aligned buffer size - _storedPayloadSize += bytesNotRead; - // reveal currently existing padding - _socketBuffer->setLimit(_storedLimit); - ensureData(bytesNotRead); - _storedPayloadSize -= bytesNotRead; - continue; - } - - // TODO we do not handle this for now (maybe never) - LOG(logLevelWarn, - "unprocessed read buffer from client at %s:%d: %d," - " disconnecting...", - __FILE__, __LINE__, getLastReadBufferSocketAddress()); - invalidDataStreamHandler(); - throw invalid_data_stream_exception( - "unprocessed read buffer"); - } - _socketBuffer->setLimit(_storedLimit); - _socketBuffer->setPosition(newPosition); - break; - } + postProcessApplicationMessage(); + throw; } } } @@ -271,6 +223,63 @@ namespace epics { } } + void AbstractCodec::postProcessApplicationMessage() + { + if (!isOpen()) + return; + + // can be closed by now + // isOpen() should be efficiently implemented + while (true) + //while (isOpen()) + { + // set position as whole message was read + //(in case code haven't done so) + std::size_t newPosition = + alignedValue( + _storedPosition + _storedPayloadSize, PVA_ALIGNMENT); + + // aligned buffer size ensures that there is enough space + //in buffer, + // however data might not be fully read + + // discard the rest of the packet + if (newPosition > _storedLimit) + { + // processApplicationMessage() did not read up + //quite some buffer + + // we only handle unused alignment bytes + int bytesNotRead = + newPosition - _socketBuffer->getPosition(); + + if (bytesNotRead < PVA_ALIGNMENT) + { + // make alignment bytes as real payload to enable SPLIT + // no end-of-socket or segmented scenario can happen + // due to aligned buffer size + _storedPayloadSize += bytesNotRead; + // reveal currently existing padding + _socketBuffer->setLimit(_storedLimit); + ensureData(bytesNotRead); + _storedPayloadSize -= bytesNotRead; + continue; + } + + // TODO we do not handle this for now (maybe never) + LOG(logLevelWarn, + "unprocessed read buffer from client at %s:%d: %d," + " disconnecting...", + __FILE__, __LINE__, getLastReadBufferSocketAddress()); + invalidDataStreamHandler(); + throw invalid_data_stream_exception( + "unprocessed read buffer"); + } + _socketBuffer->setLimit(_storedLimit); + _socketBuffer->setPosition(newPosition); + break; + } + } void AbstractCodec::processReadSegmented() { @@ -1280,6 +1289,7 @@ namespace epics { timeout.tv_sec = 1; timeout.tv_usec = 0; + // TODO remove this and implement use epicsSocketSystemCallInterruptMechanismQuery if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)) { @@ -1450,8 +1460,7 @@ namespace epics { return bytesRead; } - //TODO check what to return - return -1; + return 0; } diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index 58339b5..7b75d62 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -45,7 +45,7 @@ namespace epics { namespace pvAccess { - + // TODO replace mutex with atomic (CAS) operations template class AtomicValue { @@ -280,8 +280,8 @@ namespace epics { static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE; AbstractCodec( - std::tr1::shared_ptr receiveBuffer, - std::tr1::shared_ptr sendBuffer, + std::tr1::shared_ptr const & receiveBuffer, + std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize, bool blockingProcessQueue); @@ -343,7 +343,7 @@ namespace epics { int8_t _version; int8_t _flags; int8_t _command; - int32_t _payloadSize; + int32_t _payloadSize; // TODO why not size_t? epics::pvData::int32 _remoteTransportSocketReceiveBufferSize; int64_t _totalBytesSent; bool _blockingProcessQueue; @@ -357,13 +357,14 @@ namespace epics { std::tr1::shared_ptr _socketBuffer; std::tr1::shared_ptr _sendBuffer; - epics::pvAccess::queue _sendQueue; + epics::pvAccess::queue _sendQueue; private: void processHeader(); void processReadNormal(); - void processReadSegmented(); + void postProcessApplicationMessage(); + void processReadSegmented(); bool readToBuffer(std::size_t requiredBytes, bool persistent); void endMessage(bool hasMoreSegments); void processSender( @@ -394,8 +395,8 @@ namespace epics { POINTER_DEFINITIONS(BlockingAbstractCodec); BlockingAbstractCodec( - std::tr1::shared_ptr receiveBuffer, - std::tr1::shared_ptr sendBuffer, + std::tr1::shared_ptr const & receiveBuffer, + std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize): AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true), _readThread(0), _sendThread(0) { _isOpen.getAndSet(true);} diff --git a/pvAccessCPP.files b/pvAccessCPP.files index 6fbd1d9..82a19d7 100644 --- a/pvAccessCPP.files +++ b/pvAccessCPP.files @@ -40,6 +40,8 @@ pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp pvAccessApp/remote/simpleChannelSearchManagerImpl.h pvAccessApp/remote/transportRegistry.cpp pvAccessApp/remote/transportRegistry.h +pvAccessApp/remote/codec.cpp +pvAccessApp/remote/codec.h pvAccessApp/remoteClient/clientContextImpl.cpp pvAccessApp/remoteClient/clientContextImpl.h pvAccessApp/remoteClient/clientContextImpl.h.orig @@ -101,6 +103,8 @@ testApp/remote/testNTImage.cpp testApp/remote/testRemoteClientImpl.cpp testApp/remote/testServer.cpp testApp/remote/testServerContext.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testCodec.cpp testApp/utils/testAtomicBoolean.cpp testApp/utils/configurationTest.cpp testApp/utils/testHexDump.cpp diff --git a/pvAccessCPP.includes b/pvAccessCPP.includes index 471a178..d8138cf 100644 --- a/pvAccessCPP.includes +++ b/pvAccessCPP.includes @@ -7,4 +7,6 @@ /home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/rpcService /home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/server /home/msekoranja/epicsV4/pvAccessCPP/pvAccessApp/utils -/home/msekoranja/epicsV4/pvAccessCPP/testApp/remote \ No newline at end of file +/home/msekoranja/epicsV4/pvAccessCPP/testApp/remote +/home/msekoranja/epicsV4/pvAccessCPP/testApp/client +/home/msekoranja/epicsV4/pvAccessCPP/testApp/utils diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 602ee82..41e9adb 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -397,7 +397,7 @@ namespace epics { public: int runAllTest() { - testPlan(5885); + testPlan(5884); testHeaderProcess(); testInvalidHeaderMagic(); testInvalidHeaderSegmentedInNormal(); @@ -518,9 +518,9 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x00, + testOk(header._flags == (int8_t)0x00, "%s: header._flags == 0x00", CURRENT_FUNCTION); - testOk(header._command == 0x20, + testOk(header._command == (int8_t)0x20, "%s: header._command == 0x20", CURRENT_FUNCTION); testOk(header._payloadSize == 0x00000000, "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); @@ -530,11 +530,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, + testOk(header._flags == (int8_t)0x81, "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0xEE, + testOk(header._command == (int8_t)0xEE, "%s: header._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + testOk(header._payloadSize == (int32_t)0xDDCCBBAA, "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); } @@ -687,9 +687,9 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x01, + testOk(header._flags == (int8_t)0x01, "%s: header._flags == 0x01", CURRENT_FUNCTION); - testOk(header._command == 0x23, + testOk(header._command == (int8_t)0x23, "%s: header._command == 0x23", CURRENT_FUNCTION); testOk(header._payloadSize == 0x456789AB, "%s: header._payloadSize == 0x456789AB", CURRENT_FUNCTION); @@ -1118,11 +1118,11 @@ namespace epics { testOk(msg._version == PVA_VERSION, "%s: msg._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(msg._flags == 0x81, + testOk(msg._flags == (int8_t)0x81, "%s: msg._flags == 0x81", CURRENT_FUNCTION); - testOk(msg._command == 0xEE, + testOk(msg._command == (int8_t)0xEE, "%s: msg._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)msg._payloadSize == 0xDDCCBBAA, + testOk(msg._payloadSize == (int32_t)0xDDCCBBAA, "%s: msg._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); } @@ -1606,9 +1606,9 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, - "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0x23, + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x01), + "%s: header._flags == 0x(0|8)1", CURRENT_FUNCTION); + testOk(header._command == (int8_t)0x23, "%s: header._command == 0x23", CURRENT_FUNCTION); testOk(header._payloadSize == 0x456789AB, "%s: header._payloadSize == 0x456789AB", CURRENT_FUNCTION); @@ -1644,9 +1644,9 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x00, + testOk(header._flags == (int8_t)0x00, "%s: header._flags == 0x00", CURRENT_FUNCTION); - testOk(header._command == 0x20, + testOk(header._command == (int8_t)0x20, "%s: header._command == 0x20", CURRENT_FUNCTION); testOk(header._payloadSize == 0x00000000, "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); @@ -1656,11 +1656,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, + testOk(header._flags == (int8_t)0x81, "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0xEE, + testOk(header._command == (int8_t)0xEE, "%s: header._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + testOk(header._payloadSize == (int32_t)0xDDCCBBAA, "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); } @@ -2297,9 +2297,9 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x80, - "%s: header._flags == 0x80", CURRENT_FUNCTION); - testOk(header._command == 0x20, + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x00), + "%s: header._flags == 0x(0|8)0", CURRENT_FUNCTION); + testOk(header._command == (int8_t)0x20, "%s: header._command == 0x20", CURRENT_FUNCTION); testOk(header._payloadSize == 0x00000000, "%s: header._payloadSize == 0x00000000", CURRENT_FUNCTION); @@ -2309,11 +2309,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, - "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0xEE, + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x01), + "%s: header._flags == 0x(0|8)1", CURRENT_FUNCTION); + testOk(header._command == (int8_t)0xEE, "%s: header._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + testOk(header._payloadSize == (int32_t)0xDDCCBBAA, "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); } @@ -2441,8 +2441,8 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x80, - "%s: header._flags == 0x80", CURRENT_FUNCTION); + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x00), + "%s: header._flags == 0x(0|8)0", CURRENT_FUNCTION); testOk(header._command == 0x20, "%s: header._command == 0x20", CURRENT_FUNCTION); testOk(header._payloadSize == 0x00000000, @@ -2454,11 +2454,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, - "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0xEE, + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x01), + "%s: header._flags == 0x(0|8)1", CURRENT_FUNCTION); + testOk(header._command == (int8_t)0xEE, "%s: header._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + testOk(header._payloadSize == (int32_t)0xDDCCBBAA, "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); @@ -2499,11 +2499,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x81, - "%s: header._flags == 0x81", CURRENT_FUNCTION); - testOk(header._command == 0xEE, + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x01), + "%s: header._flags == 0x(0|8)1", CURRENT_FUNCTION); + testOk(header._command == (int8_t)0xEE, "%s: header._command == 0xEE", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == 0xDDCCBBAA, + testOk(header._payloadSize == (int32_t)0xDDCCBBAA, "%s: header._payloadSize == 0xDDCCBBAA", CURRENT_FUNCTION); } @@ -2582,11 +2582,11 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == 0x80, + testOk(header._flags == (int8_t)0x80, "%s: header._flags == 0x80", CURRENT_FUNCTION); - testOk(header._command == 0x12, + testOk(header._command == (int8_t)0x12, "%s: header._command == 0x12", CURRENT_FUNCTION); - testOk((std::size_t)header._payloadSize == bytesToSent, + testOk(header._payloadSize == (int32_t)bytesToSent, "%s: header._payloadSize == bytesToSent", CURRENT_FUNCTION); @@ -2789,8 +2789,8 @@ namespace epics { testOk(header._version == PVA_VERSION, "%s: header._version == PVA_VERSION", CURRENT_FUNCTION); - testOk(header._flags == (int8_t)(0x80 | 0x10), - "%s: header._flags == (int8_t)(0x80 | 0x10)", + testOk(header._flags == (int8_t)((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00) | 0x10), + "%s: header._flags == (int8_t)(0x(0|8)0 | 0x10)", CURRENT_FUNCTION); testOk(header._command == 0x12, "%s: header._command == 0x12", CURRENT_FUNCTION); @@ -2818,7 +2818,6 @@ namespace epics { void testRecipient() { // nothing to test, depends on implementation - testSkip(1, " testRecipient()"); }