multicast test added

This commit is contained in:
Matej Sekoranja
2014-08-20 00:24:42 +02:00
parent 114b2afbb9
commit b7f545aa06
6 changed files with 180 additions and 10 deletions

View File

@@ -188,6 +188,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
transport, version, command, payloadSize, payloadBuffer);
transport->ensureData(4+1+3+16+2);
size_t startPosition = payloadBuffer->getPosition();
const int32 searchSequenceId = payloadBuffer->getInt();
const int8 qosCode = payloadBuffer->getByte();
@@ -200,9 +203,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom,
// 128-bit IPv6 address
/*
int8* byteAddress = new int8[16];
int8 byteAddress[16];
for (int i = 0; i < 16; i++)
byteAddress[i] = payloadBuffer->getByte(); };
byteAddress[i] = payloadBuffer->getByte();
*/
// IPv4 compatible IPv6 address expected
@@ -234,7 +237,27 @@ byteAddress[i] = payloadBuffer->getByte(); };
const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0;
// TODO locally broadcast if qosCode & 0x80 == 0x80
//
// locally broadcast if unicast (qosCode & 0x80 == 0x80)
//
if ((qosCode & 0x80) == 0x80)
{
BlockingUDPTransport::shared_pointer bt = _context->getLocalMulticastTransport();
if (bt)
{
// clear unicast flag
payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80));
// update response address
payloadBuffer->setPosition(startPosition+8);
encodeAsIPv6Address(payloadBuffer, &responseAddress);
payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip()
bt->send(payloadBuffer);
return;
}
}
if (count > 0)
{

View File

@@ -295,6 +295,42 @@ void ServerContextImpl::initializeBroadcastTransport()
}
}
// TODO
/*
// TODO configurable local NIF, address
// setup local broadcasting
NetworkInterface localNIF = InetAddressUtil.getLoopbackNIF();
if (localNIF != null)
{
try
{
InetAddress group = InetAddress.getByName("224.0.0.128");
broadcastTransport.join(group, localNIF);
logger.config("Local multicast enabled on " + group + ":" + broadcastPort + " using " + localNIF.getDisplayName() + ".");
localMulticastTransport = (BlockingUDPTransport)broadcastConnector.connect(
// localMulticastTransport = (UDPTransport)broadcastConnector.connect(
null, serverResponseHandler,
listenLocalAddress, PVAConstants.PVA_PROTOCOL_REVISION,
PVAConstants.PVA_DEFAULT_PRIORITY);
localMulticastTransport.setMutlicastNIF(localNIF, true);
localMulticastTransport.setSendAddresses(new InetSocketAddress[] {
new InetSocketAddress(group, broadcastPort)
});
}
catch (Throwable th)
{
logger.log(Level.CONFIG, "Failed to join to a multicast group, local multicast disabled.", th);
}
}
else
{
logger.config("Failed to detect a loopback network interface, local multicast disabled.");
}
*/
_broadcastTransport->start();
}
catch (std::exception& e)
@@ -398,6 +434,12 @@ void ServerContextImpl::internalDestroy()
_broadcastTransport->close();
_broadcastTransport.reset();
}
// and close local multicast transport
if (_localMulticastTransport.get())
{
_localMulticastTransport->close();
_localMulticastTransport.reset();
}
// stop accepting connections
if (_acceptor.get())
@@ -568,6 +610,11 @@ BlockingUDPTransport::shared_pointer ServerContextImpl::getBroadcastTransport()
return _broadcastTransport;
}
BlockingUDPTransport::shared_pointer ServerContextImpl::getLocalMulticastTransport()
{
return _localMulticastTransport;
}
ChannelProviderRegistry::shared_pointer ServerContextImpl::getChannelProviderRegistry()
{
return _channelProviderRegistry;

View File

@@ -142,6 +142,9 @@ public:
std::auto_ptr<ResponseHandler> createResponseHandler();
virtual void newServerDetected();
BlockingUDPTransport::shared_pointer getLocalMulticastTransport();
/**
* Version.
*/
@@ -349,7 +352,12 @@ private:
*/
BlockingUDPTransport::shared_pointer _broadcastTransport;
/**
/**
* Local broadcast transport needed for local fan-out.
*/
BlockingUDPTransport::shared_pointer _localMulticastTransport;
/**
* Beacon emitter.
*/
BeaconEmitter::shared_pointer _beaconEmitter;

View File

@@ -83,7 +83,7 @@ int32 ipv4AddressToInt(const osiSockAddr& addr) {
return (int32)ntohl(addr.ia.sin_addr.s_addr);
}
int32 parseInetAddress(const string addr) {
int32 parseInetAddress(const string & addr) {
int32 retAddr;
size_t dot = addr.find('.');
@@ -117,7 +117,7 @@ int32 parseInetAddress(const string addr) {
return htonl(retAddr);
}
InetAddrVector* getSocketAddressList(std::string list, int defaultPort,
InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort,
const InetAddrVector* appendList) {
InetAddrVector* iav = new InetAddrVector();
@@ -145,7 +145,7 @@ InetAddrVector* getSocketAddressList(std::string list, int defaultPort,
return iav;
}
const string inetAddressToString(const osiSockAddr &addr,
string inetAddressToString(const osiSockAddr &addr,
bool displayPort, bool displayHex) {
stringstream saddr;
@@ -162,5 +162,21 @@ const string inetAddressToString(const osiSockAddr &addr,
return saddr.str();
}
int getLoopbackNIF(osiSockAddr &loAddr, string const & localNIF, unsigned short port)
{
if (!localNIF.empty())
{
if (aToIPAddr(localNIF.c_str(), port, &loAddr.ia) == 0)
return 0;
// else TODO log error
}
// fallback
loAddr.ia.sin_family = AF_INET;
loAddr.ia.sin_port = ntohs(port);
loAddr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
return 1;
}
}
}

View File

@@ -77,12 +77,14 @@ namespace pvAccess {
* @param appendList list to be appended.
* @return array of <code>InetSocketAddress</code>.
*/
epicsShareFunc InetAddrVector* getSocketAddressList(std::string list, int defaultPort,
epicsShareFunc InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort,
const InetAddrVector* appendList = NULL);
epicsShareFunc const std::string inetAddressToString(const osiSockAddr &addr,
epicsShareFunc std::string inetAddressToString(const osiSockAddr &addr,
bool displayPort = true, bool displayHex = false);
epicsShareFunc int getLoopbackNIF(osiSockAddr& loAddr, std::string const & localNIF, unsigned short port);
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
// comparators for osiSockAddr

View File

@@ -160,9 +160,80 @@ void test_getBroadcastAddresses()
}
void test_getLoopbackNIF()
{
testDiag("Test getLoopbackNIF()");
osiSockAddr addr;
unsigned short port = 5555;
int defaultValue = getLoopbackNIF(addr, "", port);
testOk1(defaultValue);
testOk1(AF_INET == addr.ia.sin_family);
testOk1(htons(port) == addr.ia.sin_port);
testOk1(htonl(INADDR_LOOPBACK) == addr.ia.sin_addr.s_addr);
defaultValue = getLoopbackNIF(addr, "10.0.0.1:7777", port);
testOk1(!defaultValue);
testOk1(AF_INET == addr.ia.sin_family);
testOk1(htons(7777) == addr.ia.sin_port);
testOk1(htonl(0x0A000001) == addr.ia.sin_addr.s_addr);
}
void test_multicast()
{
testDiag("Test test_multicast()");
osiSockAttach();
SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
testOk1(socket != INVALID_SOCKET);
if (socket != INVALID_SOCKET)
return;
/*
// set SO_REUSEADDR or SO_REUSEPORT, OS dependant
epicsSocketEnableAddressUseForDatagramFanout(socket);
osiSockAddr bindAddr;
bindAddr.ia.sin_family = AF_INET;
bindAddr.ia.sin_port = ntohs(5555);
bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY);
int status = ::bind(socket, (sockaddr*)&(bindAddr.sa), sizeof(sockaddr));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Failed to bind: %s", errStr);
epicsSocketDestroy(socket);
return;
}
*/
struct ip_mreq imreq;
memset(&imreq, 0, sizeof(struct ip_mreq));
imreq.imr_multiaddr.s_addr = inet_addr("224.0.0.1");
imreq.imr_interface.s_addr = INADDR_ANY;
// join multicast group on default interface
int status = ::setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(const void *)&imreq, sizeof(struct ip_mreq));
if (status)
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
fprintf(stderr, "Error setting IP_ADD_MEMBERSHIP: %s", errStr);
}
testOk1(status == 0);
epicsSocketDestroy(socket);
}
MAIN(testInetAddressUtils)
{
testPlan(51);
testPlan(60);
testDiag("Tests for InetAddress utils");
test_getSocketAddressList();
@@ -171,6 +242,9 @@ MAIN(testInetAddressUtils)
test_encodeAsIPv6Address();
test_isMulticastAddress();
test_getBroadcastAddresses();
test_getLoopbackNIF();
test_multicast();
return testDone();
}