server/client flag, _lastSegmentedMessageType != 0 comparison fix

This commit is contained in:
Matej Sekoranja
2014-07-22 00:35:29 +02:00
parent 3050323193
commit c01b928836
8 changed files with 44 additions and 21 deletions

View File

@@ -46,16 +46,18 @@ namespace epics {
POINTER_DEFINITIONS(BlockingUDPTransport);
private:
BlockingUDPTransport(std::auto_ptr<ResponseHandler>& responseHandler,
SOCKET channel, osiSockAddr& bindAddress,
BlockingUDPTransport(bool serverFlag,
std::auto_ptr<ResponseHandler> &responseHandler,
SOCKET channel, osiSockAddr &bindAddress,
short remoteTransportRevision);
public:
static shared_pointer create(std::auto_ptr<ResponseHandler>& responseHandler,
static shared_pointer create(bool serverFlag,
std::auto_ptr<ResponseHandler>& responseHandler,
SOCKET channel, osiSockAddr& bindAddress,
short remoteTransportRevision)
{
shared_pointer thisPointer(
new BlockingUDPTransport(responseHandler, channel, bindAddress, remoteTransportRevision)
new BlockingUDPTransport(serverFlag, responseHandler, channel, bindAddress, remoteTransportRevision)
);
return thisPointer;
}
@@ -358,6 +360,8 @@ namespace epics {
*/
epicsThreadId _threadId;
epics::pvData::int8 _clientServerWithEndianFlag;
};
class BlockingUDPConnector :
@@ -367,8 +371,10 @@ namespace epics {
POINTER_DEFINITIONS(BlockingUDPConnector);
BlockingUDPConnector(
bool serverFlag,
bool reuseSocket,
bool broadcast) :
_serverFlag(serverFlag),
_reuseSocket(reuseSocket),
_broadcast(broadcast) {
}
@@ -385,6 +391,11 @@ namespace epics {
private:
/**
* Client/server flag.
*/
bool _serverFlag;
/**
* Reuse socket flag.
*/

View File

@@ -68,7 +68,8 @@ namespace epics {
}
// sockets are blocking by default
Transport::shared_pointer transport = BlockingUDPTransport::create(responseHandler, socket, bindAddress, transportRevision);
Transport::shared_pointer transport = BlockingUDPTransport::create(_serverFlag,
responseHandler, socket, bindAddress, transportRevision);
return transport;
}

View File

@@ -36,6 +36,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
PVACCESS_REFCOUNT_MONITOR_DEFINE(blockingUDPTransport);
BlockingUDPTransport::BlockingUDPTransport(
bool serverFlag,
auto_ptr<ResponseHandler>& responseHandler, SOCKET channel,
osiSockAddr& bindAddress,
short /*remoteTransportRevision*/) :
@@ -49,7 +50,9 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_receiveBuffer(new ByteBuffer(MAX_UDP_RECV)),
_sendBuffer(new ByteBuffer(MAX_UDP_RECV)),
_lastMessageStartPosition(0),
_threadId(0)
_threadId(0),
_clientServerWithEndianFlag(
(serverFlag ? 0x40 : 0x00) | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00))
{
PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(blockingUDPTransport);
@@ -181,7 +184,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putByte(PVA_MAGIC);
_sendBuffer->putByte(PVA_VERSION);
_sendBuffer->putByte((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00); // data + 7-bit endianess
_sendBuffer->putByte(_clientServerWithEndianFlag);
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(payloadSize);
}

View File

@@ -40,6 +40,7 @@ namespace epics {
const std::size_t AbstractCodec::MAX_ENSURE_DATA_BUFFER_SIZE = 1024;
AbstractCodec::AbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize,
@@ -58,6 +59,7 @@ namespace epics {
_lastMessageStartPosition(std::numeric_limits<size_t>::max()),_lastSegmentedMessageType(0),
_lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0),
_byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00),
_clientServerFlag(serverFlag ? 0x40 : 0x00),
_socketSendBufferSize(0)
{
if (receiveBuffer->getSize() < 2*MAX_ENSURE_SIZE)
@@ -574,7 +576,6 @@ namespace epics {
epics::pvData::int8 command,
std::size_t ensureCapacity,
epics::pvData::int32 payloadSize) {
_lastMessageStartPosition =
std::numeric_limits<size_t>::max(); // TODO revise this
ensureBuffer(
@@ -583,7 +584,7 @@ namespace epics {
_sendBuffer->putByte(PVA_MAGIC);
_sendBuffer->putByte(PVA_VERSION);
_sendBuffer->putByte(
(_lastSegmentedMessageType | _byteOrderFlag)); // data + endian
(_lastSegmentedMessageType | _byteOrderFlag | _clientServerFlag)); // data message
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(payloadSize);
@@ -603,7 +604,7 @@ namespace epics {
ensureBuffer(PVA_MESSAGE_HEADER_SIZE);
_sendBuffer->putByte(PVA_MAGIC);
_sendBuffer->putByte(PVA_VERSION);
_sendBuffer->putByte((0x01 | _byteOrderFlag)); // control + endian
_sendBuffer->putByte((0x01 | _byteOrderFlag | _clientServerFlag)); // control message
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(data); // data
}
@@ -649,8 +650,7 @@ namespace epics {
else
{
// last segment
if (_lastSegmentedMessageType !=
std::numeric_limits<size_t>::max())
if (_lastSegmentedMessageType != 0)
{
std::size_t flagsPosition = _lastMessageStartPosition + 2;
// set last segment bit (by clearing first segment bit)
@@ -1240,10 +1240,12 @@ namespace epics {
BlockingSocketAbstractCodec::BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize):
BlockingAbstractCodec(
serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer((std::max<std::size_t>((std::size_t)(
MAX_TCP_RECV + MAX_ENSURE_DATA_BUFFER_SIZE), receiveBufferSize) +
(PVA_ALIGNMENT - 1)) & (~(PVA_ALIGNMENT - 1)))),
@@ -1456,7 +1458,7 @@ namespace epics {
std::auto_ptr<ResponseHandler>& responseHandler,
int32_t sendBufferSize,
int32_t receiveBufferSize) :
BlockingTCPTransportCodec(context, channel, responseHandler,
BlockingTCPTransportCodec(true, context, channel, responseHandler,
sendBufferSize, receiveBufferSize, PVA_DEFAULT_PRIORITY),
_lastChannelSID(0), _verifyOrVerified(false)
{
@@ -1622,7 +1624,7 @@ namespace epics {
epics::pvData::int8 /*remoteTransportRevision*/,
float beaconInterval,
int16_t priority ) :
BlockingTCPTransportCodec(context, channel, responseHandler,
BlockingTCPTransportCodec(false, context, channel, responseHandler,
sendBufferSize, receiveBufferSize, priority),
_connectionTimeout(beaconInterval*1000),
_unresponsiveTransport(false),

View File

@@ -215,6 +215,7 @@ namespace epics {
static const std::size_t MAX_ENSURE_DATA_BUFFER_SIZE;
AbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize,
@@ -326,7 +327,8 @@ namespace epics {
std::size_t _nextMessagePayloadOffset;
epics::pvData::int8 _byteOrderFlag;
int32_t _socketSendBufferSize;
epics::pvData::int8 _clientServerFlag;
int32_t _socketSendBufferSize;
};
@@ -340,10 +342,11 @@ namespace epics {
POINTER_DEFINITIONS(BlockingAbstractCodec);
BlockingAbstractCodec(
bool serverFlag,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & receiveBuffer,
std::tr1::shared_ptr<epics::pvData::ByteBuffer> const & sendBuffer,
int32_t socketSendBufferSize):
AbstractCodec(receiveBuffer, sendBuffer, socketSendBufferSize, true),
AbstractCodec(serverFlag, receiveBuffer, sendBuffer, socketSendBufferSize, true),
_readThread(0), _sendThread(0) { _isOpen.getAndSet(true);}
void readPollOne();
@@ -391,6 +394,7 @@ namespace epics {
public:
BlockingSocketAbstractCodec(
bool serverFlag,
SOCKET channel,
int32_t sendBufferSize,
int32_t receiveBufferSize);
@@ -521,6 +525,7 @@ namespace epics {
protected:
BlockingTCPTransportCodec(
bool serverFlag,
Context::shared_pointer const & context,
SOCKET channel,
std::auto_ptr<ResponseHandler>& responseHandler,
@@ -528,7 +533,7 @@ namespace epics {
int32_t receiveBufferSize,
epics::pvData::int16 priority
):
BlockingSocketAbstractCodec(channel, sendBufferSize, receiveBufferSize),
BlockingSocketAbstractCodec(serverFlag, channel, sendBufferSize, receiveBufferSize),
_context(context), _responseHandler(responseHandler),
_remoteTransportReceiveBufferSize(MAX_TCP_RECV),
_remoteTransportRevision(0), _priority(priority),

View File

@@ -4159,7 +4159,7 @@ TODO
TransportClient::shared_pointer nullTransportClient;
auto_ptr<ResponseHandler> clientResponseHandler(new ClientResponseHandler(thisPointer));
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true));
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(false, true, true));
m_broadcastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, clientResponseHandler,
listenLocalAddress, PVA_PROTOCOL_REVISION,
@@ -4175,7 +4175,7 @@ TODO
undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
clientResponseHandler.reset(new ClientResponseHandler(thisPointer));
auto_ptr<BlockingUDPConnector> searchConnector(new BlockingUDPConnector(false, true));
auto_ptr<BlockingUDPConnector> searchConnector(new BlockingUDPConnector(false, false, true));
m_searchTransport = static_pointer_cast<BlockingUDPTransport>(searchConnector->connect(
nullTransportClient, clientResponseHandler,
undefinedAddress, PVA_PROTOCOL_REVISION,

View File

@@ -260,7 +260,7 @@ void ServerContextImpl::initializeBroadcastTransport()
TransportClient::shared_pointer nullTransportClient;
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true));
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true, true));
auto_ptr<epics::pvAccess::ResponseHandler> responseHandler = createResponseHandler();
_broadcastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, responseHandler,

View File

@@ -68,10 +68,11 @@ namespace epics {
std::size_t sendBufferSize,
bool blocking = false):
AbstractCodec(
false,
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer(receiveBufferSize)),
std::tr1::shared_ptr<epics::pvData::ByteBuffer>(new ByteBuffer(sendBufferSize)),
sendBufferSize/10,
blocking ),
blocking),
_closedCount(0),
_invalidDataStreamCount(0),
_scheduleSendCount(0),