udp transport initialization refactoring (deduplication)

This commit is contained in:
Matej Sekoranja
2016-03-02 12:37:58 +01:00
parent 87fa71070d
commit 0db4a9a342
6 changed files with 305 additions and 454 deletions

View File

@@ -357,36 +357,40 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
// CMD_ORIGIN_TAG filtering
// NOTE: from design point of view this is not a right place to process application message here
if (unlikely((command == CMD_ORIGIN_TAG) && _tappedNIF))
if (unlikely(command == CMD_ORIGIN_TAG))
{
// 128-bit IPv6 address
osiSockAddr originNIFAddress;
if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress))
// enabled?
if (_tappedNIF)
{
originNIFAddress.ia.sin_family = AF_INET;
/*
LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.",
inetAddressToString(fromAddress, true).c_str(),
inetAddressToString(originNIFAddress, false).c_str());
*/
// filter
if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY))
// 128-bit IPv6 address
osiSockAddr originNIFAddress;
if (decodeAsIPv6Address(receiveBuffer, &originNIFAddress))
{
bool accept = false;
for(size_t i = 0; i < _tappedNIF->size(); i++)
{
if((*_tappedNIF)[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr)
{
accept = true;
break;
}
}
originNIFAddress.ia.sin_family = AF_INET;
// ignore messages from non-tapped NIFs
if (!accept)
return false;
/*
LOG(logLevelDebug, "Got CMD_ORIGIN_TAG message form %s tagged as %s.",
inetAddressToString(fromAddress, true).c_str(),
inetAddressToString(originNIFAddress, false).c_str());
*/
// filter
if (originNIFAddress.ia.sin_addr.s_addr != htonl(INADDR_ANY))
{
bool accept = false;
for(size_t i = 0; i < _tappedNIF->size(); i++)
{
if((*_tappedNIF)[i].ia.sin_addr.s_addr == originNIFAddress.ia.sin_addr.s_addr)
{
accept = true;
break;
}
}
// ignore messages from non-tapped NIFs
if (!accept)
return false;
}
}
}
}
@@ -408,6 +412,12 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
bool BlockingUDPTransport::send(const char* buffer, size_t length, const osiSockAddr& address)
{
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %d bytes to %s.",
length, inetAddressToString(address).c_str());
}
int retval = sendto(_channel, buffer,
length, 0, &(address.sa), sizeof(sockaddr));
if(unlikely(retval<0))
@@ -425,6 +435,13 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
bool BlockingUDPTransport::send(ByteBuffer* buffer, const osiSockAddr& address) {
buffer->flip();
if (IS_LOGGABLE(logLevelDebug))
{
LOG(logLevelDebug, "Sending %d bytes to %s.",
buffer->getRemaining(), inetAddressToString(address).c_str());
}
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
if(unlikely(retval<0))
@@ -552,6 +569,241 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
}
void initializeUDPTransports(bool serverFlag,
BlockingUDPTransportVector& udpTransports,
const IfaceNodeVector& ifaceList,
const ResponseHandler::shared_pointer& responseHandler,
BlockingUDPTransport::shared_pointer& sendTransport,
int32& listenPort,
bool autoAddressList,
const std::string& addressList,
const std::string& ignoreAddressList)
{
TransportClient::shared_pointer nullTransportClient;
auto_ptr<BlockingUDPConnector> connector(new BlockingUDPConnector(serverFlag, true, true));
// TODO configurable local NIF, address
osiSockAddr loAddr;
getLoopbackNIF(loAddr, "", 0);
// TODO configurable local multicast address
std::string mcastAddress("224.0.0.128");
osiSockAddr group;
aToIPAddr(mcastAddress.c_str(), listenPort, &group.ia);
//
// set ignore address list
//
auto_ptr<InetAddrVector> ignoreAddressVector;
if (!ignoreAddressList.empty())
ignoreAddressVector.reset(getSocketAddressList(ignoreAddressList, 0, 0));
//
// Setup UDP trasport(s) (per interface)
//
InetAddrVector tappedNIF;
for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
{
ifaceNode node = *iter;
LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s.",
inetAddressToString(node.ifaceAddr, false).c_str(),
inetAddressToString(node.ifaceBCast, false).c_str());
try
{
// where to bind (listen) address
osiSockAddr listenLocalAddress;
listenLocalAddress.ia.sin_family = AF_INET;
listenLocalAddress.ia.sin_port = htons(listenPort);
listenLocalAddress.ia.sin_addr.s_addr = node.ifaceAddr.ia.sin_addr.s_addr;
BlockingUDPTransport::shared_pointer transport = static_pointer_cast<BlockingUDPTransport>(connector->connect(
nullTransportClient, responseHandler,
listenLocalAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
listenLocalAddress = *transport->getRemoteAddress();
// to allow automatic assignment of listen port (for testing)
if (listenPort == 0)
{
listenPort = ntohs(listenLocalAddress.ia.sin_port);
aToIPAddr(mcastAddress.c_str(), listenPort, &group.ia);
LOG(logLevelDebug, "Dynamic listen UDP port set to %d.", listenPort);
}
if (ignoreAddressVector.get() && ignoreAddressVector->size())
transport->setIgnoredAddresses(ignoreAddressVector.get());
tappedNIF.push_back(listenLocalAddress);
BlockingUDPTransport::shared_pointer transport2;
if(node.ifaceBCast.ia.sin_family == AF_UNSPEC ||
node.ifaceBCast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
LOG(logLevelWarn, "Unable to find broadcast address of interface %s.", inetAddressToString(node.ifaceAddr, false).c_str());
}
#if !defined(_WIN32)
else
{
/* An oddness of BSD sockets (not winsock) is that binding to
* INADDR_ANY will receive unicast and broadcast, but binding to
* a specific interface address receives only unicast. The trick
* is to bind a second socket to the interface broadcast address,
* which will then receive only broadcasts.
*/
osiSockAddr bcastAddress;
bcastAddress.ia.sin_family = AF_INET;
bcastAddress.ia.sin_port = htons(listenPort);
bcastAddress.ia.sin_addr.s_addr = node.ifaceBCast.ia.sin_addr.s_addr;
transport2 = static_pointer_cast<BlockingUDPTransport>(connector->connect(
nullTransportClient, responseHandler,
bcastAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
/* The other wrinkle is that nothing should be sent from this second
* socket. So replies are made through the unicast socket.
*/
transport2->setReplyTransport(transport);
if (ignoreAddressVector.get() && ignoreAddressVector->size())
transport2->setIgnoredAddresses(ignoreAddressVector.get());
tappedNIF.push_back(bcastAddress);
}
#endif
transport->setMutlicastNIF(loAddr, true);
transport->setLocalMulticastAddress(group);
transport->start();
udpTransports.push_back(transport);
if (transport2)
{
transport2->start();
udpTransports.push_back(transport2);
}
}
catch (std::exception& e)
{
THROW_BASE_EXCEPTION_CAUSE("Failed to initialize UDP transport.", e);
}
catch (...)
{
THROW_BASE_EXCEPTION("Failed to initialize UDP transport.");
}
}
//
// Create UDP transport for sending (to all network interfaces)
//
osiSockAddr anyAddress;
anyAddress.ia.sin_family = AF_INET;
anyAddress.ia.sin_port = htons(0);
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
sendTransport = static_pointer_cast<BlockingUDPTransport>(connector->connect(
nullTransportClient, responseHandler,
anyAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
// TODO current implementation shares the port (aka beacon and search port)
int32 sendPort = listenPort;
//
// compile auto address list - where to send packets
//
InetAddrVector autoBCastAddr;
for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++)
{
ifaceNode node = *iter;
if (node.ifaceBCast.ia.sin_family != AF_UNSPEC)
{
node.ifaceBCast.ia.sin_port = htons(sendPort);
autoBCastAddr.push_back(node.ifaceBCast);
}
}
//
// set send address list
//
if (!addressList.empty())
{
// if auto is true, add it to specified list
if (!autoAddressList)
autoBCastAddr.clear();
auto_ptr<InetAddrVector> list(getSocketAddressList(addressList, sendPort, &autoBCastAddr));
if (list.get() && list->size())
{
sendTransport->setSendAddresses(list.get());
}
/*
else
{
// fallback
// set default (auto) address list
sendTransport->setSendAddresses(&autoBCastAddr);
}
*/
}
else if (autoAddressList)
{
// set default (auto) address list
sendTransport->setSendAddresses(&autoBCastAddr);
}
sendTransport->start();
udpTransports.push_back(sendTransport);
// debug output of broadcast addresses
InetAddrVector* blist = sendTransport->getSendAddresses();
if (!blist || !blist->size())
LOG(logLevelError,
"No broadcast addresses found or specified - empty address list!");
else
for (size_t i = 0; i < blist->size(); i++)
LOG(logLevelDebug,
"Broadcast address #%d: %s.", i, inetAddressToString((*blist)[i]).c_str());
//
// Setup local multicasting
//
BlockingUDPTransport::shared_pointer localMulticastTransport;
try
{
// NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
localMulticastTransport = static_pointer_cast<BlockingUDPTransport>(connector->connect(
nullTransportClient, responseHandler,
group, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
localMulticastTransport->setTappedNIF(&tappedNIF);
localMulticastTransport->join(group, loAddr);
localMulticastTransport->start();
udpTransports.push_back(localMulticastTransport);
LOG(logLevelDebug, "Local multicast enabled on %s/%s.",
inetAddressToString(loAddr, false).c_str(),
inetAddressToString(group).c_str());
}
catch (std::exception& ex)
{
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
}
}
}
}