manual merge

This commit is contained in:
Matej Sekoranja
2016-03-02 13:34:05 +01:00
39 changed files with 2180 additions and 1413 deletions

View File

@@ -258,16 +258,25 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
// TODO DoS attack?
const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0;
// TODO bloom filter or similar server selection (by GUID)
//
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
// locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP
//
if ((qosCode & 0x80) == 0x80)
{
BlockingUDPTransport::shared_pointer bt = _context->getLocalMulticastTransport();
if (bt)
BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast<BlockingUDPTransport>(transport);
if (bt && bt->hasLocalMulticastAddress())
{
// RECEIVE_BUFFER_PRE_RESERVE allows to pre-fix message
size_t newStartPos = (startPosition-PVA_MESSAGE_HEADER_SIZE)-PVA_MESSAGE_HEADER_SIZE-16;
payloadBuffer->setPosition(newStartPos);
// copy part of a header, and add: command, payloadSize, NIF address
payloadBuffer->put(payloadBuffer->getArray(), startPosition-PVA_MESSAGE_HEADER_SIZE, PVA_MESSAGE_HEADER_SIZE-5);
payloadBuffer->putByte(CMD_ORIGIN_TAG);
payloadBuffer->putInt(16);
// encode this socket bind address
encodeAsIPv6Address(payloadBuffer, bt->getBindAddress());
// clear unicast flag
payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
@@ -275,9 +284,12 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
payloadBuffer->setPosition(startPosition+8);
encodeAsIPv6Address(payloadBuffer, &responseAddress);
payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip()
// set to end of a message
payloadBuffer->setPosition(payloadBuffer->getLimit());
bt->send(payloadBuffer->getArray()+newStartPos, payloadBuffer->getPosition()-newStartPos,
bt->getLocalMulticastAddress());
bt->send(payloadBuffer);
return;
}
}
@@ -760,7 +772,7 @@ ChannelRequester::shared_pointer ServerChannelRequesterImpl::create(
std::tr1::shared_ptr<ServerChannelRequesterImpl> tp(new ServerChannelRequesterImpl(transport, channelName, cid, css));
ChannelRequester::shared_pointer cr = tp;
// TODO exception guard and report error back
provider->createChannel(channelName, cr, transport->getPriority());
provider->createChannel(channelName, cr, transport->getPriority());
return cr;
}
@@ -817,7 +829,7 @@ void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::s
LOG(logLevelDebug, "Exception caught when creating channel: %s", _channelName.c_str());
{
Lock guard(_mutex);
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what());
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel", e.what());
}
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
@@ -830,7 +842,7 @@ void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::s
LOG(logLevelDebug, "Exception caught when creating channel: %s", _channelName.c_str());
{
Lock guard(_mutex);
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel");
_status = Status(Status::STATUSTYPE_FATAL, "failed to create channel");
}
TransportSender::shared_pointer thisSender = shared_from_this();
transport->enqueueSendRequest(thisSender);
@@ -841,16 +853,45 @@ void ServerChannelRequesterImpl::channelCreated(const Status& status, Channel::s
}
}
void ServerChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & /*channel*/, const Channel::ConnectionState /*isConnected*/)
void ServerChannelRequesterImpl::channelStateChange(Channel::shared_pointer const & /*channel*/, const Channel::ConnectionState isConnected)
{
// TODO should we notify remote side?
if(isConnected==Channel::CONNECTED || isConnected==Channel::NEVER_CONNECTED)
return;
if(Transport::shared_pointer transport = _transport.lock())
{
ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast<ChannelHostingTransport>(transport);
if (!casTransport)
return;
ServerChannelImpl::shared_pointer channel;
{
Lock guard(_mutex);
channel= dynamic_pointer_cast<ServerChannelImpl>(_serverChannel.lock());
}
if (!channel)
return;
// destroy
channel->destroy();
// .. and unregister
casTransport->unregisterChannel(channel->getSID());
// send response back
TransportSender::shared_pointer sr(new ServerDestroyChannelHandlerTransportSender(channel->getCID(), channel->getSID()));
transport->enqueueSendRequest(sr);
}
}
string ServerChannelRequesterImpl::getRequesterName()
{
std::stringstream name;
name << "ServerChannelRequesterImpl/" << _channelName << "[" << _cid << "]";
return name.str();
Transport::shared_pointer transport = _transport.lock();
if (transport)
return transport->getRemoteName();
else
return "<unknown>:0";
}
void ServerChannelRequesterImpl::message(std::string const & message, MessageType messageType)
@@ -878,32 +919,29 @@ void ServerChannelRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
status = _status;
}
// error response
if (serverChannel.get() == NULL)
{
createChannelFailedResponse(buffer, control, status);
}
// OK
else if (Transport::shared_pointer transport = _transport.lock())
{
ServerChannelImpl::shared_pointer serverChannelImpl = dynamic_pointer_cast<ServerChannelImpl>(serverChannel);
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(serverChannelImpl->getCID());
buffer->putInt(serverChannelImpl->getSID());
status.serialize(buffer, control);
}
}
void ServerChannelRequesterImpl::createChannelFailedResponse(ByteBuffer* buffer, TransportSendControl* control, const Status& status)
{
if (Transport::shared_pointer transport = _transport.lock())
{
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(-1);
status.serialize(buffer, control);
}
if (Transport::shared_pointer transport = _transport.lock())
{
// error response
if (!serverChannel)
{
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(_cid);
buffer->putInt(-1);
// error status is expected or channel has been destroyed locally
if (status.isSuccess())
status = Status(Status::STATUSTYPE_ERROR, "channel has been destroyed");
status.serialize(buffer, control);
}
// OK
else
{
ServerChannelImpl::shared_pointer serverChannelImpl = dynamic_pointer_cast<ServerChannelImpl>(serverChannel);
control->startMessage((int8)CMD_CREATE_CHANNEL, 2*sizeof(int32)/sizeof(int8));
buffer->putInt(serverChannelImpl->getCID());
buffer->putInt(serverChannelImpl->getSID());
status.serialize(buffer, control);
}
}
}
/****************************************************************************************/
@@ -919,7 +957,7 @@ void ServerDestroyChannelHandler::handleResponse(osiSockAddr* responseFrom,
ChannelHostingTransport::shared_pointer casTransport = dynamic_pointer_cast<ChannelHostingTransport>(transport);
transport->ensureData(2*sizeof(int32)/sizeof(int8));
transport->ensureData(8);
const pvAccessID sid = payloadBuffer->getInt();
const pvAccessID cid = payloadBuffer->getInt();
@@ -1989,7 +2027,7 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s
{
Lock guard(_mutex);
_status = status;
_channelMonitor = monitor;
_channelMonitor = monitor; //TODO inconsistent locking for _channelMonitor
_structure = structure;
}
TransportSender::shared_pointer thisSender = shared_from_this();