local multicast implemented

This commit is contained in:
Matej Sekoranja
2014-09-01 14:10:36 +02:00
parent c1115437e3
commit e6ca9ea7f2
4 changed files with 90 additions and 25 deletions

View File

@@ -281,6 +281,10 @@ namespace epics {
}
}
void join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr);
void setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback);
protected:
AtomicBoolean _closed;

View File

@@ -4,6 +4,11 @@
* in file LICENSE that is included with this distribution.
*/
#ifdef _WIN32
// needed for ip_mreq
#include <Ws2tcpip.h>
#endif
#include <pv/blockingUDP.h>
#include <pv/pvaConstants.h>
#include <pv/inetAddressUtil.h>
@@ -408,5 +413,58 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so
((BlockingUDPTransport*)param)->processRead();
}
void BlockingUDPTransport::join(const osiSockAddr & mcastAddr, const osiSockAddr & nifAddr)
{
struct ip_mreq imreq;
memset(&imreq, 0, sizeof(struct ip_mreq));
imreq.imr_multiaddr.s_addr = mcastAddr.ia.sin_addr.s_addr;
imreq.imr_interface.s_addr = nifAddr.ia.sin_addr.s_addr;
// join multicast group on default interface
int status = ::setsockopt(_channel, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(char*)&imreq, sizeof(struct ip_mreq));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
throw std::runtime_error(
string("Failed to join to the multicast group '") +
inetAddressToString(mcastAddr) + "' on network interface '" +
inetAddressToString(nifAddr, false) + "': " + errStr);
}
}
void BlockingUDPTransport::setMutlicastNIF(const osiSockAddr & nifAddr, bool loopback)
{
// set the multicast outgoing interface
int status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_IF,
(char*)&nifAddr.ia.sin_addr, sizeof(struct in_addr));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
throw std::runtime_error(
string("Failed to set multicast network inteface '") +
inetAddressToString(nifAddr, false) + "': " + errStr);
}
// send multicast traffic to myself too
unsigned char mcast_loop = (loopback ? 1 : 0);
status = ::setsockopt(_channel, IPPROTO_IP, IP_MULTICAST_LOOP,
(char*)&mcast_loop, sizeof(unsigned char));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
throw std::runtime_error(
string("Failed to enable multicast loopback on network inteface '") +
inetAddressToString(nifAddr, false) + "': " + errStr);
}
}
}
}

View File

@@ -215,7 +215,7 @@ byteAddress[i] = payloadBuffer->getByte();
if (payloadBuffer->getShort() != (int16)0xFFFF) return;
// accept given address if explicitly specified by sender
responseAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt());
responseAddress.ia.sin_addr.s_addr = payloadBuffer->getInt();
if (responseAddress.ia.sin_addr.s_addr == INADDR_ANY)
responseAddress.ia.sin_addr = responseFrom->ia.sin_addr;
@@ -358,7 +358,7 @@ void ServerChannelFindRequesterImpl::channelFindResult(const Status& /*status*/,
{
ServerSearchHandler::s_channelNameToProvider[_name] = channelFind->getChannelProvider();
}
_wasFound = wasFound;
TransportSender::shared_pointer thisSender = shared_from_this();
_context->getBroadcastTransport()->enqueueSendRequest(thisSender);
@@ -403,7 +403,6 @@ void ServerChannelFindRequesterImpl::send(ByteBuffer* buffer, TransportSendContr
buffer->putShort((int16)0);
}
control->setRecipient(_sendTo);
}

View File

@@ -295,42 +295,46 @@ void ServerContextImpl::initializeBroadcastTransport()
}
}
// TODO
/*
// TODO configurable local NIF, address
// setup local broadcasting
NetworkInterface localNIF = InetAddressUtil.getLoopbackNIF();
if (localNIF != null)
// TODO configurable local NIF, address
osiSockAddr loAddr;
getLoopbackNIF(loAddr, "", 0);
if (true)
{
try
{
InetAddress group = InetAddress.getByName("224.0.0.128");
broadcastTransport.join(group, localNIF);
osiSockAddr group;
aToIPAddr("224.0.0.128", _broadcastPort, &group.ia);
_broadcastTransport->join(group, loAddr);
logger.config("Local multicast enabled on " + group + ":" + broadcastPort + " using " + localNIF.getDisplayName() + ".");
LOG(logLevelDebug, "Local multicast enabled on %s using network interface %s.",
inetAddressToString(group).c_str(), inetAddressToString(loAddr, false).c_str());
localMulticastTransport = (BlockingUDPTransport)broadcastConnector.connect(
// localMulticastTransport = (UDPTransport)broadcastConnector.connect(
null, serverResponseHandler,
listenLocalAddress, PVAConstants.PVA_PROTOCOL_REVISION,
PVAConstants.PVA_DEFAULT_PRIORITY);
localMulticastTransport.setMutlicastNIF(localNIF, true);
localMulticastTransport.setSendAddresses(new InetSocketAddress[] {
new InetSocketAddress(group, broadcastPort)
});
_localMulticastTransport = static_pointer_cast<BlockingUDPTransport>(broadcastConnector->connect(
nullTransportClient, responseHandler,
listenLocalAddress, PVA_PROTOCOL_REVISION,
PVA_DEFAULT_PRIORITY));
_localMulticastTransport->setMutlicastNIF(loAddr, true);
InetAddrVector sendAddressList;
sendAddressList.push_back(group);
_localMulticastTransport->setSendAddresses(&sendAddressList);
}
catch (Throwable th)
catch (std::exception& ex)
{
logger.log(Level.CONFIG, "Failed to join to a multicast group, local multicast disabled.", th);
if (_localMulticastTransport)
{
_localMulticastTransport->close();
_localMulticastTransport.reset();
}
LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what());
}
}
else
{
logger.config("Failed to detect a loopback network interface, local multicast disabled.");
LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled.");
}
*/
_broadcastTransport->start();
}
catch (std::exception& e)