diff --git a/src/remote/blockingUDP.h b/src/remote/blockingUDP.h index 6956c05..938e32a 100644 --- a/src/remote/blockingUDP.h +++ b/src/remote/blockingUDP.h @@ -46,16 +46,18 @@ namespace epics { POINTER_DEFINITIONS(BlockingUDPTransport); private: - BlockingUDPTransport(std::auto_ptr& responseHandler, - SOCKET channel, osiSockAddr& bindAddress, + BlockingUDPTransport(bool serverFlag, + std::auto_ptr &responseHandler, + SOCKET channel, osiSockAddr &bindAddress, short remoteTransportRevision); public: - static shared_pointer create(std::auto_ptr& responseHandler, + static shared_pointer create(bool serverFlag, + std::auto_ptr& responseHandler, SOCKET channel, osiSockAddr& bindAddress, short remoteTransportRevision) { shared_pointer thisPointer( - new BlockingUDPTransport(responseHandler, channel, bindAddress, remoteTransportRevision) + new BlockingUDPTransport(serverFlag, responseHandler, channel, bindAddress, remoteTransportRevision) ); return thisPointer; } @@ -358,6 +360,8 @@ namespace epics { */ epicsThreadId _threadId; + epics::pvData::int8 _clientServerWithEndianFlag; + }; class BlockingUDPConnector : @@ -367,8 +371,10 @@ namespace epics { POINTER_DEFINITIONS(BlockingUDPConnector); BlockingUDPConnector( + bool serverFlag, bool reuseSocket, bool broadcast) : + _serverFlag(serverFlag), _reuseSocket(reuseSocket), _broadcast(broadcast) { } @@ -385,6 +391,11 @@ namespace epics { private: + /** + * Client/server flag. + */ + bool _serverFlag; + /** * Reuse socket flag. */ diff --git a/src/remote/blockingUDPConnector.cpp b/src/remote/blockingUDPConnector.cpp index a0a9ac4..2d919ab 100644 --- a/src/remote/blockingUDPConnector.cpp +++ b/src/remote/blockingUDPConnector.cpp @@ -68,7 +68,8 @@ namespace epics { } // sockets are blocking by default - Transport::shared_pointer transport = BlockingUDPTransport::create(responseHandler, socket, bindAddress, transportRevision); + Transport::shared_pointer transport = BlockingUDPTransport::create(_serverFlag, + responseHandler, socket, bindAddress, transportRevision); return transport; } diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index a9455e7..9cc40a2 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -36,6 +36,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport); BlockingUDPTransport::BlockingUDPTransport( + bool serverFlag, auto_ptr& responseHandler, SOCKET channel, osiSockAddr& bindAddress, short /*remoteTransportRevision*/) : @@ -49,7 +50,9 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _receiveBuffer(new ByteBuffer(MAX_UDP_RECV)), _sendBuffer(new ByteBuffer(MAX_UDP_RECV)), _lastMessageStartPosition(0), - _threadId(0) + _threadId(0), + _clientServerWithEndianFlag( + (serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingUDPTransport); @@ -181,7 +184,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so _lastMessageStartPosition = _sendBuffer->getPosition(); _sendBuffer->putByte(PVA_MAGIC); _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess + _sendBuffer->putByte(_clientServerWithEndianFlag); _sendBuffer->putByte(command); // command _sendBuffer->putInt(payloadSize); } diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index 5f996cf..00cffc9 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -40,6 +40,7 @@ namespace epics { const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024; AbstractCodec::AbstractCodec( + bool serverFlag, std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize, @@ -58,6 +59,7 @@ namespace epics { _lastMessageStartPosition(std::numeric_limits::max()),_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00), + _clientServerFlag(serverFlag ? 0x40 : 0x00), _socketSendBufferSize(0) { if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE) @@ -574,7 +576,6 @@ namespace epics { epics::pvData::int8 command, std::size_t ensureCapacity, epics::pvData::int32 payloadSize) { - _lastMessageStartPosition = std::numeric_limits::max(); // TODO revise this ensureBuffer( @@ -583,7 +584,7 @@ namespace epics { _sendBuffer->putByte(PVA_MAGIC); _sendBuffer->putByte(PVA_VERSION); _sendBuffer->putByte( - (_lastSegmentedMessageType | _byteOrderFlag)); // data + endian + (_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message _sendBuffer->putByte(command); // command _sendBuffer->putInt(payloadSize); @@ -603,7 +604,7 @@ namespace epics { ensureBuffer(PVA_MESSAGE_HEADER_SIZE); _sendBuffer->putByte(PVA_MAGIC); _sendBuffer->putByte(PVA_VERSION); - _sendBuffer->putByte((0x01 | _byteOrderFlag)); // control + endian + _sendBuffer->putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message _sendBuffer->putByte(command); // command _sendBuffer->putInt(data); // data } @@ -649,8 +650,7 @@ namespace epics { else { // last segment - if (_lastSegmentedMessageType != - std::numeric_limits::max()) + if (_lastSegmentedMessageType != 0) { std::size_t flagsPosition = _lastMessageStartPosition + 2; // set last segment bit (by clearing first segment bit) @@ -1240,10 +1240,12 @@ namespace epics { BlockingSocketAbstractCodec::BlockingSocketAbstractCodec( + bool serverFlag, SOCKET channel, int32_t sendBufferSize, int32_t receiveBufferSize): BlockingAbstractCodec( + serverFlag, std::tr1::shared_ptr(new ByteBuffer((std::max((std::size_t)( MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) + (PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))), @@ -1456,7 +1458,7 @@ namespace epics { std::auto_ptr& responseHandler, int32_t sendBufferSize, int32_t receiveBufferSize) : - BlockingTCPTransportCodec(context, channel, responseHandler, + BlockingTCPTransportCodec(true, context, channel, responseHandler, sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY), _lastChannelSID(0), _verifyOrVerified(false) { @@ -1622,7 +1624,7 @@ namespace epics { epics::pvData::int8 /*remoteTransportRevision*/, float beaconInterval, int16_t priority ) : - BlockingTCPTransportCodec(context, channel, responseHandler, + BlockingTCPTransportCodec(false, context, channel, responseHandler, sendBufferSize, receiveBufferSize, priority), _connectionTimeout(beaconInterval*1000), _unresponsiveTransport(false), diff --git a/src/remote/codec.h b/src/remote/codec.h index ee4a4f2..1660de8 100644 --- a/src/remote/codec.h +++ b/src/remote/codec.h @@ -215,6 +215,7 @@ namespace epics { static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE; AbstractCodec( + bool serverFlag, std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize, @@ -326,7 +327,8 @@ namespace epics { std::size_t _nextMessagePayloadOffset; epics::pvData::int8 _byteOrderFlag; - int32_t _socketSendBufferSize; + epics::pvData::int8 _clientServerFlag; + int32_t _socketSendBufferSize; }; @@ -340,10 +342,11 @@ namespace epics { POINTER_DEFINITIONS(BlockingAbstractCodec); BlockingAbstractCodec( + bool serverFlag, std::tr1::shared_ptr const & receiveBuffer, std::tr1::shared_ptr const & sendBuffer, int32_t socketSendBufferSize): - AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true), + AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true), _readThread(0), _sendThread(0) { _isOpen.getAndSet(true);} void readPollOne(); @@ -391,6 +394,7 @@ namespace epics { public: BlockingSocketAbstractCodec( + bool serverFlag, SOCKET channel, int32_t sendBufferSize, int32_t receiveBufferSize); @@ -521,6 +525,7 @@ namespace epics { protected: BlockingTCPTransportCodec( + bool serverFlag, Context::shared_pointer const & context, SOCKET channel, std::auto_ptr& responseHandler, @@ -528,7 +533,7 @@ namespace epics { int32_t receiveBufferSize, epics::pvData::int16 priority ): - BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize), + BlockingSocketAbstractCodec(serverFlag, channel, sendBufferSize, receiveBufferSize), _context(context), _responseHandler(responseHandler), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportRevision(0), _priority(priority), diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 6723ead..9775d5c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -4159,7 +4159,7 @@ TODO TransportClient::shared_pointer nullTransportClient; auto_ptr clientResponseHandler(new ClientResponseHandler(thisPointer)); - auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); + auto_ptr broadcastConnector(new BlockingUDPConnector(false, true, true)); m_broadcastTransport = static_pointer_cast(broadcastConnector->connect( nullTransportClient, clientResponseHandler, listenLocalAddress, PVA_PROTOCOL_REVISION, @@ -4175,7 +4175,7 @@ TODO undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); clientResponseHandler.reset(new ClientResponseHandler(thisPointer)); - auto_ptr searchConnector(new BlockingUDPConnector(false, true)); + auto_ptr searchConnector(new BlockingUDPConnector(false, false, true)); m_searchTransport = static_pointer_cast(searchConnector->connect( nullTransportClient, clientResponseHandler, undefinedAddress, PVA_PROTOCOL_REVISION, diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 12b32cb..48912cf 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -260,7 +260,7 @@ void ServerContextImpl::initializeBroadcastTransport() TransportClient::shared_pointer nullTransportClient; - auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); + auto_ptr broadcastConnector(new BlockingUDPConnector(true, true, true)); auto_ptr responseHandler = createResponseHandler(); _broadcastTransport = static_pointer_cast(broadcastConnector->connect( nullTransportClient, responseHandler, diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 1fa3246..1a0d4b5 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -68,10 +68,11 @@ namespace epics { std::size_t sendBufferSize, bool blocking = false): AbstractCodec( + false, std::tr1::shared_ptr(new ByteBuffer(receiveBufferSize)), std::tr1::shared_ptr(new ByteBuffer(sendBufferSize)), sendBufferSize/10, - blocking ), + blocking), _closedCount(0), _invalidDataStreamCount(0), _scheduleSendCount(0),