diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index fda28c2..ec532de 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -188,6 +188,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(4+1+3+16+2); + + size_t startPosition = payloadBuffer->getPosition(); + const int32 searchSequenceId = payloadBuffer->getInt(); const int8 qosCode = payloadBuffer->getByte(); @@ -200,9 +203,9 @@ void ServerSearchHandler::handleResponse(osiSockAddr* responseFrom, // 128-bit IPv6 address /* -int8* byteAddress = new int8[16]; +int8 byteAddress[16]; for (int i = 0; i < 16; i++) -byteAddress[i] = payloadBuffer->getByte(); }; +byteAddress[i] = payloadBuffer->getByte(); */ // IPv4 compatible IPv6 address expected @@ -234,7 +237,27 @@ byteAddress[i] = payloadBuffer->getByte(); }; const bool responseRequired = (QOS_REPLY_REQUIRED & qosCode) != 0; - // TODO locally broadcast if qosCode & 0x80 == 0x80 + // + // locally broadcast if unicast (qosCode & 0x80 == 0x80) + // + if ((qosCode & 0x80) == 0x80) + { + BlockingUDPTransport::shared_pointer bt = _context->getLocalMulticastTransport(); + if (bt) + { + // clear unicast flag + payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80)); + + // update response address + payloadBuffer->setPosition(startPosition+8); + encodeAsIPv6Address(payloadBuffer, &responseAddress); + + payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip() + + bt->send(payloadBuffer); + return; + } + } if (count > 0) { diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 68eb734..f047af2 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -295,6 +295,42 @@ void ServerContextImpl::initializeBroadcastTransport() } } + // TODO + /* + // TODO configurable local NIF, address + // setup local broadcasting + NetworkInterface localNIF = InetAddressUtil.getLoopbackNIF(); + if (localNIF != null) + { + try + { + InetAddress group = InetAddress.getByName("224.0.0.128"); + broadcastTransport.join(group, localNIF); + + logger.config("Local multicast enabled on " + group + ":" + broadcastPort + " using " + localNIF.getDisplayName() + "."); + + localMulticastTransport = (BlockingUDPTransport)broadcastConnector.connect( +// localMulticastTransport = (UDPTransport)broadcastConnector.connect( + null, serverResponseHandler, + listenLocalAddress, PVAConstants.PVA_PROTOCOL_REVISION, + PVAConstants.PVA_DEFAULT_PRIORITY); + localMulticastTransport.setMutlicastNIF(localNIF, true); + localMulticastTransport.setSendAddresses(new InetSocketAddress[] { + new InetSocketAddress(group, broadcastPort) + }); + } + catch (Throwable th) + { + logger.log(Level.CONFIG, "Failed to join to a multicast group, local multicast disabled.", th); + } + } + else + { + logger.config("Failed to detect a loopback network interface, local multicast disabled."); + } + + */ + _broadcastTransport->start(); } catch (std::exception& e) @@ -398,6 +434,12 @@ void ServerContextImpl::internalDestroy() _broadcastTransport->close(); _broadcastTransport.reset(); } + // and close local multicast transport + if (_localMulticastTransport.get()) + { + _localMulticastTransport->close(); + _localMulticastTransport.reset(); + } // stop accepting connections if (_acceptor.get()) @@ -568,6 +610,11 @@ BlockingUDPTransport::shared_pointer ServerContextImpl::getBroadcastTransport() return _broadcastTransport; } +BlockingUDPTransport::shared_pointer ServerContextImpl::getLocalMulticastTransport() +{ + return _localMulticastTransport; +} + ChannelProviderRegistry::shared_pointer ServerContextImpl::getChannelProviderRegistry() { return _channelProviderRegistry; diff --git a/src/server/serverContext.h b/src/server/serverContext.h index e0bd7f4..66527cf 100644 --- a/src/server/serverContext.h +++ b/src/server/serverContext.h @@ -142,6 +142,9 @@ public: std::auto_ptr createResponseHandler(); virtual void newServerDetected(); + + BlockingUDPTransport::shared_pointer getLocalMulticastTransport(); + /** * Version. */ @@ -349,7 +352,12 @@ private: */ BlockingUDPTransport::shared_pointer _broadcastTransport; - /** + /** + * Local broadcast transport needed for local fan-out. + */ + BlockingUDPTransport::shared_pointer _localMulticastTransport; + + /** * Beacon emitter. */ BeaconEmitter::shared_pointer _beaconEmitter; diff --git a/src/utils/inetAddressUtil.cpp b/src/utils/inetAddressUtil.cpp index ca14a39..6810ec8 100644 --- a/src/utils/inetAddressUtil.cpp +++ b/src/utils/inetAddressUtil.cpp @@ -83,7 +83,7 @@ int32 ipv4AddressToInt(const osiSockAddr& addr) { return (int32)ntohl(addr.ia.sin_addr.s_addr); } -int32 parseInetAddress(const string addr) { +int32 parseInetAddress(const string & addr) { int32 retAddr; size_t dot = addr.find('.'); @@ -117,7 +117,7 @@ int32 parseInetAddress(const string addr) { return htonl(retAddr); } -InetAddrVector* getSocketAddressList(std::string list, int defaultPort, +InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort, const InetAddrVector* appendList) { InetAddrVector* iav = new InetAddrVector(); @@ -145,7 +145,7 @@ InetAddrVector* getSocketAddressList(std::string list, int defaultPort, return iav; } -const string inetAddressToString(const osiSockAddr &addr, +string inetAddressToString(const osiSockAddr &addr, bool displayPort, bool displayHex) { stringstream saddr; @@ -162,5 +162,21 @@ const string inetAddressToString(const osiSockAddr &addr, return saddr.str(); } +int getLoopbackNIF(osiSockAddr &loAddr, string const & localNIF, unsigned short port) +{ + if (!localNIF.empty()) + { + if (aToIPAddr(localNIF.c_str(), port, &loAddr.ia) == 0) + return 0; + // else TODO log error + } + + // fallback + loAddr.ia.sin_family = AF_INET; + loAddr.ia.sin_port = ntohs(port); + loAddr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + return 1; +} + } } diff --git a/src/utils/inetAddressUtil.h b/src/utils/inetAddressUtil.h index 00f5c37..e6bfb32 100644 --- a/src/utils/inetAddressUtil.h +++ b/src/utils/inetAddressUtil.h @@ -77,12 +77,14 @@ namespace pvAccess { * @param appendList list to be appended. * @return array of InetSocketAddress. */ - epicsShareFunc InetAddrVector* getSocketAddressList(std::string list, int defaultPort, + epicsShareFunc InetAddrVector* getSocketAddressList(const std::string & list, int defaultPort, const InetAddrVector* appendList = NULL); - epicsShareFunc const std::string inetAddressToString(const osiSockAddr &addr, + epicsShareFunc std::string inetAddressToString(const osiSockAddr &addr, bool displayPort = true, bool displayHex = false); + epicsShareFunc int getLoopbackNIF(osiSockAddr& loAddr, std::string const & localNIF, unsigned short port); + /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ // comparators for osiSockAddr diff --git a/testApp/utils/testInetAddressUtils.cpp b/testApp/utils/testInetAddressUtils.cpp index b2e06a5..4cb95c9 100644 --- a/testApp/utils/testInetAddressUtils.cpp +++ b/testApp/utils/testInetAddressUtils.cpp @@ -160,9 +160,80 @@ void test_getBroadcastAddresses() } +void test_getLoopbackNIF() +{ + testDiag("Test getLoopbackNIF()"); + + osiSockAddr addr; + unsigned short port = 5555; + + int defaultValue = getLoopbackNIF(addr, "", port); + + testOk1(defaultValue); + testOk1(AF_INET == addr.ia.sin_family); + testOk1(htons(port) == addr.ia.sin_port); + testOk1(htonl(INADDR_LOOPBACK) == addr.ia.sin_addr.s_addr); + + defaultValue = getLoopbackNIF(addr, "10.0.0.1:7777", port); + + testOk1(!defaultValue); + testOk1(AF_INET == addr.ia.sin_family); + testOk1(htons(7777) == addr.ia.sin_port); + testOk1(htonl(0x0A000001) == addr.ia.sin_addr.s_addr); +} + +void test_multicast() +{ + testDiag("Test test_multicast()"); + + osiSockAttach(); + + SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + testOk1(socket != INVALID_SOCKET); + if (socket != INVALID_SOCKET) + return; +/* + // set SO_REUSEADDR or SO_REUSEPORT, OS dependant + epicsSocketEnableAddressUseForDatagramFanout(socket); + + osiSockAddr bindAddr; + bindAddr.ia.sin_family = AF_INET; + bindAddr.ia.sin_port = ntohs(5555); + bindAddr.ia.sin_addr.s_addr = htonl(INADDR_ANY); + + int status = ::bind(socket, (sockaddr*)&(bindAddr.sa), sizeof(sockaddr)); + if (status) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + fprintf(stderr, "Failed to bind: %s", errStr); + epicsSocketDestroy(socket); + return; + } +*/ + struct ip_mreq imreq; + memset(&imreq, 0, sizeof(struct ip_mreq)); + + imreq.imr_multiaddr.s_addr = inet_addr("224.0.0.1"); + imreq.imr_interface.s_addr = INADDR_ANY; + + // join multicast group on default interface + int status = ::setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (const void *)&imreq, sizeof(struct ip_mreq)); + if (status) + { + char errStr[64]; + epicsSocketConvertErrnoToString(errStr, sizeof(errStr)); + fprintf(stderr, "Error setting IP_ADD_MEMBERSHIP: %s", errStr); + } + testOk1(status == 0); + + epicsSocketDestroy(socket); +} + MAIN(testInetAddressUtils) { - testPlan(51); + testPlan(60); testDiag("Tests for InetAddress utils"); test_getSocketAddressList(); @@ -171,6 +242,9 @@ MAIN(testInetAddressUtils) test_encodeAsIPv6Address(); test_isMulticastAddress(); test_getBroadcastAddresses(); + test_getLoopbackNIF(); + + test_multicast(); return testDone(); }