server: multiple NIF support for UDP

This commit is contained in:
Matej Sekoranja
2016-01-04 14:12:19 +01:00
parent 708379ec0a
commit 21a1dad07f
5 changed files with 370 additions and 272 deletions

View File

@@ -33,7 +33,6 @@ ServerContextImpl::ServerContextImpl():
_serverPort(PVA_SERVER_PORT),
_receiveBufferSize(MAX_TCP_RECV),
_timer(),
_broadcastTransport(),
_beaconEmitter(),
_acceptor(),
_transportRegistry(),
@@ -166,20 +165,19 @@ void ServerContextImpl::loadConfiguration()
_channelProviderNames = config->getPropertyAsString("EPICS_PVA_PROVIDER_NAMES", _channelProviderNames);
_channelProviderNames = config->getPropertyAsString("EPICS_PVAS_PROVIDER_NAMES", _channelProviderNames);
_ifaceBCast.ia.sin_family = AF_UNSPEC;
if(_ifaceAddr.ia.sin_addr.s_addr != htonl(INADDR_ANY)) {
ELLLIST alist = ELLLIST_INIT;
SOCKET sock = epicsSocketCreate(AF_INET, SOCK_STREAM, 0);
if(sock) {
osiSockDiscoverBroadcastAddresses(&alist, sock, &_ifaceAddr);
epicsSocketDestroy(sock);
}
if(ellCount(&alist)>0) {
osiSockAddrNode *node = (osiSockAddrNode*)ellFirst(&alist);
_ifaceBCast = node->addr;
}
ellFree(&alist);
SOCKET sock = epicsSocketCreate(AF_INET, SOCK_STREAM, 0);
if (!sock) {
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
return;
}
if (discoverInterfaces(_ifaceList, sock, &_ifaceAddr) || _ifaceList.size() == 0)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport, no interfaces available.");
return;
}
epicsSocketDestroy(sock);
}
bool ServerContextImpl::isChannelProviderNamePreconfigured()
@@ -269,201 +267,230 @@ void ServerContextImpl::internalInitialize()
// setup broadcast UDP transport
initializeBroadcastTransport();
// TODO introduce a constant
// TODO introduce "tcp" a constant
_beaconEmitter.reset(new BeaconEmitter("tcp", _broadcastTransport, thisServerContext));
}
void ServerContextImpl::initializeBroadcastTransport()
{
// setup UDP transport
try
{
// where to bind (listen) address
osiSockAddr listenLocalAddress;
listenLocalAddress.ia.sin_family = AF_INET;
listenLocalAddress.ia.sin_port = htons(_broadcastPort);
listenLocalAddress.ia.sin_addr.s_addr = _ifaceAddr.ia.sin_addr.s_addr;
TransportClient::shared_pointer nullTransportClient;
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true, true));
osiSockAddr anyAddress;
anyAddress.ia.sin_family = AF_INET;
anyAddress.ia.sin_port = htons(0);
anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY);
_broadcastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
anyAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
//
// set broadcast (send) addresses - where to send beacons
//
InetAddrVector autoBCastAddr;
for (IfaceNodeVector::const_iterator iter = _ifaceList.begin(); iter != _ifaceList.end(); iter++)
{
ifaceNode node = *iter;
if (node.ifaceBCast.ia.sin_family != AF_UNSPEC)
{
node.ifaceBCast.ia.sin_port = htons(_broadcastPort);
autoBCastAddr.push_back(node.ifaceBCast);
}
}
//
// set beacon (broadcast) address list
//
TransportClient::shared_pointer nullTransportClient;
// set broadcast address list
if (!_beaconAddressList.empty())
{
// if auto is true, add it to specified list
if (!_autoBeaconAddressList)
autoBCastAddr.clear();
auto_ptr<BlockingUDPConnector> broadcastConnector(new BlockingUDPConnector(true, true, true));
_broadcastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
listenLocalAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
listenLocalAddress = *_broadcastTransport->getRemoteAddress();
_broadcastPort = ntohs(listenLocalAddress.ia.sin_port);
_ifaceBCast.ia.sin_port = listenLocalAddress.ia.sin_port;
auto_ptr<InetAddrVector> list(getSocketAddressList(_beaconAddressList, _broadcastPort, &autoBCastAddr));
if (list.get() && list->size())
{
_broadcastTransport->setSendAddresses(list.get());
}
else // TODO or no fallback at all
{
// fallback
// set default (auto) address list
_broadcastTransport->setSendAddresses(&autoBCastAddr);
}
}
else
{
// fallback
// set default (auto) address list
_broadcastTransport->setSendAddresses(&autoBCastAddr);
}
if(_ifaceAddr.ia.sin_addr.s_addr != htonl(INADDR_ANY)) {
if(_ifaceBCast.ia.sin_family == AF_UNSPEC ||
_ifaceBCast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
LOG(logLevelWarn, "Unable to find broadcast address of interface %s.", inetAddressToString(_ifaceBCast, false).c_str());
// debug output for broadcast addresses
InetAddrVector* blist = _broadcastTransport->getSendAddresses();
if (!blist || !blist->size())
LOG(logLevelWarn,
"No beacon broadcast addresses found or specified!");
else
for (size_t i = 0; i < blist->size(); i++)
LOG(logLevelDebug,
"Beacon broadcast address #%d: %s.", i, inetAddressToString((*blist)[i]).c_str());
//
// set ignore address list
//
auto_ptr<InetAddrVector> ignoreAddressList;
if (!_ignoreAddressList.empty())
{
// we do not care about the port
ignoreAddressList.reset(getSocketAddressList(_ignoreAddressList, 0, NULL));
}
// TODO configurable local NIF, address
osiSockAddr loAddr;
getLoopbackNIF(loAddr, "", 0);
for (IfaceNodeVector::const_iterator iter = _ifaceList.begin(); iter != _ifaceList.end(); iter++)
{
ifaceNode node = *iter;
LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s, index %d.",
inetAddressToString(node.ifaceAddr, false).c_str(),
inetAddressToString(node.ifaceBCast, false).c_str(),
node.ifaceIndex);
try
{
// where to bind (listen) address
// TODO opt copy
osiSockAddr listenLocalAddress;
listenLocalAddress.ia.sin_family = AF_INET;
listenLocalAddress.ia.sin_port = htons(_broadcastPort);
listenLocalAddress.ia.sin_addr.s_addr = node.ifaceAddr.ia.sin_addr.s_addr;
BlockingUDPTransport::shared_pointer transport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
listenLocalAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
listenLocalAddress = *transport->getRemoteAddress();
if (ignoreAddressList.get() && ignoreAddressList->size())
transport->setIgnoredAddresses(ignoreAddressList.get());
BlockingUDPTransport::shared_pointer transport2;
if(node.ifaceBCast.ia.sin_family == AF_UNSPEC ||
node.ifaceBCast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) {
LOG(logLevelWarn, "Unable to find broadcast address of interface %s.", inetAddressToString(node.ifaceBCast, false).c_str());
}
#if !defined(_WIN32)
else
{
/* An oddness of BSD sockets (not winsock) is that binding to
* INADDR_ANY will receive unicast and broadcast, but binding to
* a specific interface address receives only unicast. The trick
* is to bind a second socket to the interface broadcast address,
* which will then receive only broadcasts.
*/
// TODO opt copy
osiSockAddr bcastAddress;
bcastAddress.ia.sin_family = AF_INET;
bcastAddress.ia.sin_port = htons(_broadcastPort);
bcastAddress.ia.sin_addr.s_addr = node.ifaceBCast.ia.sin_addr.s_addr;
transport2 = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
bcastAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
/* The other wrinkle is that nothing should be sent from this second
* socket. So replies are made through the unicast socket.
*/
transport2->setReplyTransport(transport);
if (ignoreAddressList.get() && ignoreAddressList->size())
transport2->setIgnoredAddresses(ignoreAddressList.get());
}
#endif
// TODO set ignore list on transport and transport2
//
// Setup local broadcasting
//
// Each network interface gets its own multicast group on a local interface.
// Multicast address is determined by prefix 224.0.0.128 + NIF index
//
osiSockAddr group;
int lastAddr = 128 + node.ifaceIndex;
std::ostringstream o;
// TODO configurable prefix and base
o << "224.0.0." << lastAddr;
aToIPAddr(o.str().c_str(), _broadcastPort, &group.ia);
transport->setMutlicastNIF(loAddr, true);
transport->setLocalMulticastAddress(group);
BlockingUDPTransport::shared_pointer localMulticastTransport;
if (true)
{
try
{
// NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
localMulticastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
group, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
localMulticastTransport->join(group, loAddr);
LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.",
inetAddressToString(listenLocalAddress, false).c_str(),
inetAddressToString(loAddr, false).c_str(),
inetAddressToString(group).c_str());
}
catch (std::exception& ex)
{
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
}
}
#if !defined(_WIN32)
else
{
/* An oddness of BSD sockets (not winsock) is that binding to
* INADDR_ANY will receive unicast and broadcast, but binding to
* a specific interface address receives only unicast. The trick
* is to bind a second socket to the interface broadcast address,
* which will then receive only broadcasts.
*/
_broadcastTransport2 = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
_ifaceBCast, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
/* The other wrinkle is that nothing should be sent from this second
* socket. So replies are made through the unicast socket.
*/
_broadcastTransport2->setReplyTransport(_broadcastTransport);
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
}
transport->start();
if(transport2)
transport2->start();
if (localMulticastTransport)
localMulticastTransport->start();
_udpTransports.push_back(transport);
_udpTransports.push_back(transport2);
_udpTransports.push_back(localMulticastTransport);
}
#endif
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (socket == INVALID_SOCKET)
catch (std::exception& e)
{
THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e);
}
catch (...)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
}
auto_ptr<InetAddrVector> broadcastAddresses;
if(_ifaceAddr.ia.sin_addr.s_addr != htonl(INADDR_ANY))
{
InetAddrVector * v = new InetAddrVector;
v->push_back(_ifaceBCast);
broadcastAddresses.reset(v);
}
else
{
// all the interfaces
broadcastAddresses.reset(getBroadcastAddresses(socket, _broadcastPort));
}
int ifIndex = discoverInterfaceIndex(socket, &listenLocalAddress);
if (ifIndex == -1)
{
LOG(logLevelWarn, "Unable to find interface index for %s.", inetAddressToString(listenLocalAddress, false).c_str());
// TODO fallback
}
epicsSocketDestroy(socket);
// set default (auto) address list
_broadcastTransport->setSendAddresses(broadcastAddresses.get());
// set broadcast address list
if (!_beaconAddressList.empty())
{
// if auto is true, add it to specified list
InetAddrVector* appendList = NULL;
if (_autoBeaconAddressList == true)
{
appendList = _broadcastTransport->getSendAddresses();
}
auto_ptr<InetAddrVector> list(getSocketAddressList(_beaconAddressList, _broadcastPort, appendList));
if (list.get() != NULL && list->size() > 0)
{
_broadcastTransport->setSendAddresses(list.get());
}
}
// debug output for broadcast addresses
InetAddrVector* blist = _broadcastTransport->getSendAddresses();
if (!blist || !blist->size())
LOG(logLevelWarn,
"No broadcast addresses found or specified!");
else
for (size_t i = 0; i < blist->size(); i++)
LOG(logLevelDebug,
"Broadcast address #%d: %s.", i, inetAddressToString((*blist)[i]).c_str());
//
// set ignore address list
//
if (!_ignoreAddressList.empty())
{
// we do not care about the port
auto_ptr<InetAddrVector> list(getSocketAddressList(_ignoreAddressList, 0, NULL));
if (list.get() != NULL && list->size() > 0)
{
_broadcastTransport->setIgnoredAddresses(list.get());
}
}
//
// Setup local broadcasting
//
// Each network interface gets its own multicast group on a local interface.
// Multicast address is determined by prefix 224.0.0.124 + NIF index
//
// TODO configurable local NIF, address
osiSockAddr loAddr;
getLoopbackNIF(loAddr, "", 0);
osiSockAddr group;
int lastAddr = 128 + ifIndex;
std::ostringstream o;
// TODO configurable prefix and base
o << "224.0.0." << lastAddr;
aToIPAddr(o.str().c_str(), _broadcastPort, &group.ia);
_broadcastTransport->setMutlicastNIF(loAddr, true);
_broadcastTransport->setLocalMulticastAddress(group);
if (true)
{
try
{
// NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address
_localMulticastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, _responseHandler,
group, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
_localMulticastTransport->join(group, loAddr);
/* used for sending
_localMulticastTransport->setMutlicastNIF(loAddr, true);
InetAddrVector sendAddressList;
sendAddressList.push_back(group);
_localMulticastTransport->setSendAddresses(&sendAddressList);
*/
LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.",
inetAddressToString(listenLocalAddress, false).c_str(),
inetAddressToString(loAddr, false).c_str(),
inetAddressToString(group).c_str());
}
catch (std::exception& ex)
{
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
}
}
else
{
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
}
_broadcastTransport->start();
if(_broadcastTransport2)
_broadcastTransport2->start();
if (_localMulticastTransport)
_localMulticastTransport->start();
}
catch (std::exception& e)
{
THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e);
}
catch (...)
{
THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport");
}
}
}
void ServerContextImpl::run(int32 seconds)
@@ -552,34 +579,32 @@ void ServerContextImpl::destroy()
void ServerContextImpl::internalDestroy()
{
// stop responding to search requests
if (_broadcastTransport)
_broadcastTransport->close();
if (_broadcastTransport2)
_broadcastTransport2->close();
_broadcastTransport.reset();
_broadcastTransport2.reset();
for (BlockingUDPTransportVector::const_iterator iter = _udpTransports.begin();
iter != _udpTransports.end(); iter++)
(*iter)->close();
_udpTransports.clear();
// and close local multicast transport
if (_localMulticastTransport.get())
// stop emitting beacons
if (_beaconEmitter)
{
_localMulticastTransport->close();
_localMulticastTransport.reset();
_beaconEmitter->destroy();
_beaconEmitter.reset();
}
// close UDP sent transport
if (_broadcastTransport)
{
_broadcastTransport->close();
_broadcastTransport.reset();
}
// stop accepting connections
if (_acceptor.get())
if (_acceptor)
{
_acceptor->destroy();
_acceptor.reset();
}
// stop emitting beacons
if (_beaconEmitter.get())
{
_beaconEmitter->destroy();
_beaconEmitter.reset();
}
// this will also destroy all channels
destroyAllTransports();
}
@@ -732,12 +757,7 @@ osiSockAddr* ServerContextImpl::getServerInetAddress()
BlockingUDPTransport::shared_pointer ServerContextImpl::getBroadcastTransport()
{
return _broadcastTransport;
}
BlockingUDPTransport::shared_pointer ServerContextImpl::getLocalMulticastTransport()
{
return _localMulticastTransport;
return _broadcastTransport;
}
ChannelProviderRegistry::shared_pointer ServerContextImpl::getChannelProviderRegistry()