diff --git a/src/remote/abstractResponseHandler.cpp b/src/remote/abstractResponseHandler.cpp index 4c7c0ec..4c72d66 100644 --- a/src/remote/abstractResponseHandler.cpp +++ b/src/remote/abstractResponseHandler.cpp @@ -22,7 +22,7 @@ namespace epics { namespace pvAccess { void AbstractResponseHandler::handleResponse(osiSockAddr* responseFrom, - Transport::shared_pointer const & /*transport*/, int8 version, int8 command, + Transport::shared_pointer const & transport, int8 version, int8 command, size_t payloadSize, ByteBuffer* payloadBuffer) { if(_debugLevel >= 3) { // TODO make a constant of sth (0 - off, 1 - debug, 2 - more/trace, 3 - messages) char ipAddrStr[48]; @@ -30,7 +30,7 @@ namespace epics { ostringstream prologue; prologue<<"Message [0x"<getRemoteName(); hexDump(prologue.str(), _description, (const int8*)payloadBuffer->getArray(), diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 8296d56..78ccca7 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -390,8 +390,11 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so (target == inetAddressType_broadcast_multicast && _isSendAddressUnicast[i])) continue; - LOG(logLevelDebug, "Sending to %d bytes to %s.", - buffer->getRemaining(), inetAddressToString((*_sendAddresses)[i]).c_str()); + if (IS_LOGGABLE(logLevelDebug)) + { + LOG(logLevelDebug, "Sending %d bytes to %s.", + buffer->getRemaining(), inetAddressToString((*_sendAddresses)[i]).c_str()); + } int retval = sendto(_channel, buffer->getArray(), buffer->getLimit(), 0, &((*_sendAddresses)[i].sa), diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 83d36bb..88df85c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2914,7 +2914,7 @@ namespace epics { // to do the local multicast // - // locally broadcast if unicast (qosCode & 0x80 == 0x80) + // locally broadcast if unicast (qosCode & 0x80 == 0x80) via UDP // if ((qosCode & 0x80) == 0x80) { @@ -2923,11 +2923,8 @@ namespace epics { if (!context) return; - BlockingUDPTransport::shared_pointer bt = - //context->getLocalMulticastTransport(); - std::tr1::dynamic_pointer_cast(context->getSearchTransport()); - - if (bt) + BlockingUDPTransport::shared_pointer bt = dynamic_pointer_cast(transport); + if (bt && bt->hasLocalMulticastAddress()) { // clear unicast flag payloadBuffer->put(startPosition+4, (int8)(qosCode & ~0x80)); @@ -2938,7 +2935,7 @@ namespace epics { payloadBuffer->setPosition(payloadBuffer->getLimit()); // send will call flip() - bt->send(payloadBuffer, context->getLocalBroadcastAddress()); + bt->send(payloadBuffer, bt->getLocalMulticastAddress()); return; } } @@ -4417,11 +4414,6 @@ namespace epics { PVACCESS_REFCOUNT_MONITOR_DESTRUCT(remoteClientContext); }; - virtual const osiSockAddr& getLocalBroadcastAddress() const - { - return m_localBroadcastAddress; - } - private: void loadConfiguration() { @@ -4468,119 +4460,228 @@ namespace epics { */ bool initializeUDPTransport() { - // where to bind (listen) address - osiSockAddr listenLocalAddress; - listenLocalAddress.ia.sin_family = AF_INET; - listenLocalAddress.ia.sin_port = htons(m_broadcastPort); - listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - - // quary broadcast addresses of all IFs + // query broadcast addresses of all IFs SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); - if (socket == INVALID_SOCKET) return false; - auto_ptr broadcastAddresses(getBroadcastAddresses(socket, m_broadcastPort)); - - int ifIndex = discoverInterfaceIndex(socket, &listenLocalAddress); - if (ifIndex == -1) + if (socket == INVALID_SOCKET) { - LOG(logLevelWarn, "Unable to find interface index for %s.", inetAddressToString(listenLocalAddress, false).c_str()); - // TODO fallback + LOG(logLevelError, "Failed to initialize broadcast UDP transport."); + return false; + } + + IfaceNodeVector ifaceList; + if (discoverInterfaces(ifaceList, socket, 0) || ifaceList.size() == 0) + { + LOG(logLevelError, "Failed to initialize broadcast UDP transport, no interfaces available."); + return false; } epicsSocketDestroy (socket); - // set broadcast address list - if (!m_addressList.empty()) - { - // if auto is true, add it to specified list - InetAddrVector* appendList = 0; - if (m_autoAddressList) - appendList = broadcastAddresses.get(); - auto_ptr list(getSocketAddressList(m_addressList, m_broadcastPort, appendList)); - if (list.get() && list->size()) { - // delete old list and take ownership of a new one - broadcastAddresses = list; - } - } - - if (!broadcastAddresses.get() || !broadcastAddresses->size()) - LOG(logLevelWarn, - "No broadcast addresses found or specified!"); - else - for (size_t i = 0; i < broadcastAddresses->size(); i++) - LOG(logLevelDebug, - "Broadcast address #%d: %s.", i, inetAddressToString((*broadcastAddresses)[i]).c_str()); - - ClientContextImpl::shared_pointer thisPointer = shared_from_this(); + //ClientContextImpl::shared_pointer thisPointer = shared_from_this(); TransportClient::shared_pointer nullTransportClient; - auto_ptr broadcastConnector(new BlockingUDPConnector(false, true, true)); - m_broadcastTransport = static_pointer_cast(broadcastConnector->connect( - nullTransportClient, m_responseHandler, - listenLocalAddress, PVA_PROTOCOL_REVISION, - PVA_DEFAULT_PRIORITY)); - if (!m_broadcastTransport.get()) - return false; - m_broadcastTransport->setSendAddresses(broadcastAddresses.get()); - // undefined address - osiSockAddr undefinedAddress; - undefinedAddress.ia.sin_family = AF_INET; - undefinedAddress.ia.sin_port = htons(0); - undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); + osiSockAddr anyAddress; + anyAddress.ia.sin_family = AF_INET; + anyAddress.ia.sin_port = htons(0); + anyAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - auto_ptr searchConnector(new BlockingUDPConnector(false, false, true)); - m_searchTransport = static_pointer_cast(searchConnector->connect( + m_searchTransport = static_pointer_cast(broadcastConnector->connect( nullTransportClient, m_responseHandler, - undefinedAddress, PVA_PROTOCOL_REVISION, + anyAddress, PVA_PROTOCOL_REVISION, PVA_DEFAULT_PRIORITY)); if (!m_searchTransport.get()) return false; - m_searchTransport->setSendAddresses(broadcastAddresses.get()); - // TODO do not use searchBroadcast in future - // setup local broadcasting - // TODO configurable local NIF, address - osiSockAddr loAddr; - getLoopbackNIF(loAddr, "", 0); - if (true) + + + + InetAddrVector autoBCastAddr; + for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++) { - try + ifaceNode node = *iter; + + if (node.ifaceBCast.ia.sin_family != AF_UNSPEC) { - int lastAddr = 128 + ifIndex; - std::ostringstream o; - // TODO configurable - o << "224.0.0." << lastAddr; - - //osiSockAddr group; - aToIPAddr(o.str().c_str(), m_broadcastPort, &m_localBroadcastAddress.ia); - m_broadcastTransport->join(m_localBroadcastAddress, loAddr); - - // NOTE: this disables usage of multicast addresses in EPICS_PVA_ADDR_LIST - m_searchTransport->setMutlicastNIF(loAddr, true); - - //InetAddrVector sendAddressList; - //sendAddressList.push_back(group); - //m_searchTransport->setSendAddresses(&sendAddressList); - - LOG(logLevelDebug, "Local multicast enabled on %s using network interface %s.", - inetAddressToString(m_localBroadcastAddress).c_str(), inetAddressToString(loAddr, false).c_str()); + node.ifaceBCast.ia.sin_port = htons(m_broadcastPort); + autoBCastAddr.push_back(node.ifaceBCast); } - catch (std::exception& ex) + } + + // + // set beacon (broadcast) address list + // + + + if (!m_addressList.empty()) + { + // if auto is true, add it to specified list + if (!m_autoAddressList) + autoBCastAddr.clear(); + + auto_ptr list(getSocketAddressList(m_addressList, m_broadcastPort, &autoBCastAddr)); + if (list.get() && list->size()) { - LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); + m_searchTransport->setSendAddresses(list.get()); + } + else // TODO or no fallback at all + { + // fallback + // set default (auto) address list + m_searchTransport->setSendAddresses(&autoBCastAddr); } } else { - LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); + // fallback + // set default (auto) address list + m_searchTransport->setSendAddresses(&autoBCastAddr); } - // become active - m_broadcastTransport->start(); m_searchTransport->start(); + // debug output for broadcast addresses + InetAddrVector* blist = m_searchTransport->getSendAddresses(); + if (!blist || !blist->size()) + LOG(logLevelWarn, + "No broadcast addresses found or specified!"); + else + for (size_t i = 0; i < blist->size(); i++) + LOG(logLevelDebug, + "Broadcast address #%d: %s.", i, inetAddressToString((*blist)[i]).c_str()); + + + // TODO configurable local NIF, address + osiSockAddr loAddr; + getLoopbackNIF(loAddr, "", 0); + + for (IfaceNodeVector::const_iterator iter = ifaceList.begin(); iter != ifaceList.end(); iter++) + { + ifaceNode node = *iter; + + LOG(logLevelDebug, "Setting up UDP for interface %s, broadcast %s, index %d.", + inetAddressToString(node.ifaceAddr, false).c_str(), + inetAddressToString(node.ifaceBCast, false).c_str(), + node.ifaceIndex); + try + { + // where to bind (listen) address + // TODO opt copy + osiSockAddr listenLocalAddress; + listenLocalAddress.ia.sin_family = AF_INET; + listenLocalAddress.ia.sin_port = htons(m_broadcastPort); + listenLocalAddress.ia.sin_addr.s_addr = node.ifaceAddr.ia.sin_addr.s_addr; + + BlockingUDPTransport::shared_pointer transport = static_pointer_cast(broadcastConnector->connect( + nullTransportClient, m_responseHandler, + listenLocalAddress, PVA_PROTOCOL_REVISION, + PVA_DEFAULT_PRIORITY)); + listenLocalAddress = *transport->getRemoteAddress(); + + BlockingUDPTransport::shared_pointer transport2; + + if(node.ifaceBCast.ia.sin_family == AF_UNSPEC || + node.ifaceBCast.ia.sin_addr.s_addr == listenLocalAddress.ia.sin_addr.s_addr) { + LOG(logLevelWarn, "Unable to find broadcast address of interface %s.", inetAddressToString(node.ifaceBCast, false).c_str()); + } + #if !defined(_WIN32) + else + { + /* An oddness of BSD sockets (not winsock) is that binding to + * INADDR_ANY will receive unicast and broadcast, but binding to + * a specific interface address receives only unicast. The trick + * is to bind a second socket to the interface broadcast address, + * which will then receive only broadcasts. + */ + + // TODO opt copy + osiSockAddr bcastAddress; + bcastAddress.ia.sin_family = AF_INET; + bcastAddress.ia.sin_port = htons(m_broadcastPort); + bcastAddress.ia.sin_addr.s_addr = node.ifaceBCast.ia.sin_addr.s_addr; + + transport2 = static_pointer_cast(broadcastConnector->connect( + nullTransportClient, m_responseHandler, + bcastAddress, PVA_PROTOCOL_REVISION, + PVA_DEFAULT_PRIORITY)); + /* The other wrinkle is that nothing should be sent from this second + * socket. So replies are made through the unicast socket. + */ + transport2->setReplyTransport(transport); + } + #endif + + // TODO set ignore list on transport and transport2 + + + // + // Setup local broadcasting + // + // Each network interface gets its own multicast group on a local interface. + // Multicast address is determined by prefix 224.0.0.128 + NIF index + // + + osiSockAddr group; + int lastAddr = 128 + node.ifaceIndex; + std::ostringstream o; + // TODO configurable prefix and base + o << "224.0.0." << lastAddr; + aToIPAddr(o.str().c_str(), m_broadcastPort, &group.ia); + + transport->setMutlicastNIF(loAddr, true); + transport->setLocalMulticastAddress(group); + + BlockingUDPTransport::shared_pointer localMulticastTransport; + + if (true) + { + try + { + // NOTE: multicast receiver socket must be "bound" to INADDR_ANY or multicast address + localMulticastTransport = static_pointer_cast(broadcastConnector->connect( + nullTransportClient, m_responseHandler, + group, PVA_PROTOCOL_REVISION, + PVA_DEFAULT_PRIORITY)); + localMulticastTransport->join(group, loAddr); + + LOG(logLevelDebug, "Local multicast for %s enabled on %s/%s.", + inetAddressToString(listenLocalAddress, false).c_str(), + inetAddressToString(loAddr, false).c_str(), + inetAddressToString(group).c_str()); + } + catch (std::exception& ex) + { + LOG(logLevelDebug, "Failed to initialize local multicast, funcionality disabled. Reason: %s.", ex.what()); + } + } + else + { + LOG(logLevelDebug, "Failed to detect a loopback network interface, local multicast disabled."); + } + + transport->start(); + if(transport2) + transport2->start(); + if (localMulticastTransport) + localMulticastTransport->start(); + + m_udpTransports.push_back(transport); + m_udpTransports.push_back(transport2); + m_udpTransports.push_back(localMulticastTransport); + + } + catch (std::exception& e) + { + THROW_BASE_EXCEPTION_CAUSE("Failed to initialize broadcast UDP transport", e); + } + catch (...) + { + THROW_BASE_EXCEPTION("Failed to initialize broadcast UDP transport"); + } + } + return true; } @@ -4594,8 +4695,14 @@ namespace epics { destroyAllChannels(); // stop UDPs - m_searchTransport->close(); - m_broadcastTransport->close(); + for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin(); + iter != m_udpTransports.end(); iter++) + (*iter)->close(); + m_udpTransports.clear(); + + // stop UDPs + if (m_searchTransport) + m_searchTransport->close(); // wait for all transports to cleanly exit int tries = 40; @@ -4990,9 +5097,10 @@ namespace epics { Timer::shared_pointer m_timer; /** - * Broadcast transport needed to listen for broadcasts. + * UDP transports needed to receive channel searches. */ - BlockingUDPTransport::shared_pointer m_broadcastTransport; + typedef std::vector BlockingUDPTransportVector; + BlockingUDPTransportVector m_udpTransports; /** * UDP transport needed for channel searches. @@ -5105,8 +5213,6 @@ namespace epics { TransportRegistry::transportVector_t m_flushTransports; FlushStrategy m_flushStrategy; - - osiSockAddr m_localBroadcastAddress; }; namespace { diff --git a/src/remoteClient/clientContextImpl.h b/src/remoteClient/clientContextImpl.h index 06e7c02..9f70461 100644 --- a/src/remoteClient/clientContextImpl.h +++ b/src/remoteClient/clientContextImpl.h @@ -129,8 +129,6 @@ namespace epics { virtual void poll() = 0; virtual void destroy() = 0; - - virtual const osiSockAddr& getLocalBroadcastAddress() const = 0; }; epicsShareExtern ChannelProvider::shared_pointer createClientProvider(const Configuration::shared_pointer& conf);