From df4289b4e62a93cac832a66e946da2d7a545754a Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Wed, 8 Sep 2021 09:30:51 -0700 Subject: [PATCH] IPv6+mcast support --- src/client.cpp | 122 +++++++---- src/clientimpl.h | 10 +- src/config.cpp | 277 ++++++++++++++++++++---- src/describe.cpp | 2 +- src/evhelper.cpp | 388 ++++++++++++++++++++++++++++------ src/evhelper.h | 80 +++++-- src/os/WIN32/osdSockExt.cpp | 103 +++++++-- src/os/default/osdSockExt.cpp | 105 ++++++--- src/osiSockExt.h | 83 ++++++-- src/pvaproto.h | 10 +- src/pvxs/client.h | 9 +- src/server.cpp | 69 +++--- src/serverconn.cpp | 23 +- src/serverconn.h | 7 +- src/udp_collector.cpp | 179 +++++++++------- src/udp_collector.h | 16 +- src/unittest.cpp | 1 + src/util.cpp | 155 ++++++++++++-- src/utilpvt.h | 2 +- test/testsock.cpp | 85 +++++--- 20 files changed, 1311 insertions(+), 415 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 17fbe51..ef34b4d 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -468,11 +468,15 @@ Value buildCAMethod() } ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) - :effective(conf) + :canIPv6(evsocket::canIPv6()) + ,ifmap(IfaceMap::instance()) + ,effective(conf) ,caMethod(buildCAMethod()) - ,searchTx(AF_INET, SOCK_DGRAM, 0) + ,searchTx4(AF_INET, SOCK_DGRAM, 0) + ,searchTx6(AF_INET6, SOCK_DGRAM, 0) ,tcp_loop(tcp_loop) - ,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this)) + ,searchRx4(event_new(tcp_loop.base, searchTx4.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this)) + ,searchRx6(event_new(tcp_loop.base, searchTx6.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this)) ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this)) ,manager(UDPManager::instance()) ,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this)) @@ -484,52 +488,60 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop) searchBuckets.resize(nBuckets); std::set bcasts; - for(auto& addr : searchTx.broadcasts()) { + for(auto& addr : searchTx4.broadcasts()) { addr.setPort(0u); bcasts.insert(addr); } + searchTx6.ipv6_only(); + { - osiSockAddr any{}; - any.ia.sin_family = AF_INET; - if(bind(searchTx.sock, &any.sa, sizeof(any.ia))) + auto any(SockAddr::any(searchTx4.af)); + if(bind(searchTx4.sock, &any->sa, any.size())) throw std::runtime_error("Unable to bind random UDP port"); - socklen_t alen = sizeof(any); - if(getsockname(searchTx.sock, &any.sa, &alen)) + socklen_t alen = any.capacity(); + if(getsockname(searchTx4.sock, &any->sa, &alen)) throw std::runtime_error("Unable to readback random UDP port"); - searchRxPort = ntohs(any.ia.sin_port); + searchRxPort = any.port(); log_debug_printf(setup, "Using UDP Rx port %u\n", searchRxPort); } - { - int val = 1; - if(setsockopt(searchTx.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val))) - log_err_printf(setup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO); + auto any(SockAddr::any(searchTx6.af, searchRxPort)); + if(bind(searchTx6.sock, &any->sa, any.size())) + throw std::runtime_error("Unable to bind random UDP6 port"); } - enable_SO_RXQ_OVFL(searchTx.sock); + + searchTx4.set_broadcast(true); + searchTx4.enable_SO_RXQ_OVFL(); + searchTx6.enable_SO_RXQ_OVFL(); for(auto& addr : effective.addressList) { - SockAddr saddr(AF_INET); + SockEndpoint ep; try { - saddr.setAddress(addr.c_str(), effective.udp_port); - }catch(std::runtime_error& e) { - log_err_printf(setup, "%s Ignoring %s\n", e.what(), addr.c_str()); + ep = SockEndpoint(addr, effective.udp_port); + }catch(std::exception& e){ + log_warn_printf(setup, "%s Ignoring malformed address %s\n", e.what(), addr.c_str()); continue; } - // if !bcast and !mcast - auto isucast = bcasts.find(saddr)==bcasts.end() && !saddr.isMCast(); + assert(ep.addr.family()==AF_INET || ep.addr.family()==AF_INET6); - log_info_printf(io, "Searching to %s%s\n", saddr.tostring().c_str(), (isucast?" unicast":"")); - searchDest.emplace_back(saddr, isucast); + // if !bcast and !mcast + auto isucast = !ep.addr.isMCast(); + + if(isucast && ep.addr.family()==AF_INET && bcasts.find(ep.addr)!=bcasts.end()) + isucast = false; + + log_info_printf(io, "Searching to %s%s\n", std::string(SB()<=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) { @@ -776,7 +801,7 @@ bool ContextImpl::onSearch() } if(nrx<0) { - int err = evutil_socket_geterror(searchTx.sock); + int err = evutil_socket_geterror(fd); if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { // nothing to do here } else { @@ -844,7 +869,7 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw) // limit number of packets processed before going back to the reactor unsigned i; const unsigned limit = 40; - for(i=0; i(raw)->onSearch(); i++) {} + for(i=0; i(raw)->onSearch(fd); i++) {} log_debug_printf(io, "UDP search processed %u/%u\n", i, limit); }catch(std::exception& e){ @@ -976,20 +1001,27 @@ void ContextImpl::tickSearch(bool discover) to_wire(H, Header{CMD_SEARCH, 0, uint32_t(consumed-8u)}); } for(auto& pair : searchDest) { - if(pair.second) + auto& dest = pair.first.addr.family()==AF_INET ? searchTx4 : searchTx6; + + if(pair.second) { *pflags |= pva_search_flags::Unicast; - else + + } else { *pflags &= ~pva_search_flags::Unicast; - int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size()); + dest.mcast_prep_sendto(pair.first); + } + + int ntx = sendto(dest.sock, (char*)searchMsg.data(), consumed, 0, + &pair.first.addr->sa, pair.first.addr.size()); if(ntx<0) { - int err = evutil_socket_geterror(searchTx.sock); + int err = evutil_socket_geterror(dest.sock); auto lvl = Level::Warn; if(err==EINTR || err==EPERM) lvl = Level::Debug; - log_printf(io, lvl, "Search tx error (%d) %s\n", - err, evutil_socket_error_to_string(err)); + log_printf(io, lvl, "Search tx %s error (%d) %s\n", + pair.first.addr.tostring().c_str(), err, evutil_socket_error_to_string(err)); } else if(unsigned(ntx) { SockAttach attach; + const bool canIPv6; + IfaceMap& ifmap; // "const" after ctor Config effective; @@ -235,7 +237,7 @@ struct ContextImpl : public std::enable_shared_from_this uint32_t nextCID=0x12345678; uint32_t prevndrop = 0u; - evsocket searchTx; + evsocket searchTx4, searchTx6; uint16_t searchRxPort; std::vector ignoreServerGUIDs; @@ -256,7 +258,7 @@ struct ContextImpl : public std::enable_shared_from_this std::vector searchMsg; // search destination address and whether to set the unicast flag - std::vector> searchDest; + std::vector> searchDest; size_t currentBucket = 0u; std::vector>> searchBuckets; @@ -274,7 +276,7 @@ struct ContextImpl : public std::enable_shared_from_this std::vector>> nameServers; evbase tcp_loop; - const evevent searchRx; + const evevent searchRx4, searchRx6; const evevent searchTimer; // beacon handling done on UDP worker. @@ -302,7 +304,7 @@ struct ContextImpl : public std::enable_shared_from_this void onBeacon(const UDPManager::Beacon& msg); - bool onSearch(); + bool onSearch(evutil_socket_t fd); static void onSearchS(evutil_socket_t fd, short evt, void *raw); void tickSearch(bool discover); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); diff --git a/src/config.cpp b/src/config.cpp index 3b37247..cd7c77f 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -20,6 +20,7 @@ #include #include "serverconn.h" #include "clientimpl.h" +#include "utilpvt.h" #include "evhelper.h" DEFINE_LOGGER(serversetup, "pvxs.server.setup"); @@ -28,6 +29,116 @@ DEFINE_LOGGER(config, "pvxs.config"); namespace pvxs { +SockEndpoint::SockEndpoint(const char* ep, uint16_t defport) +{ + // + // , + // @ifacename + // ,@ifacename + auto comma = strchr(ep, ','); + auto at = strchr(ep, '@'); + + if(comma && at && comma > at) { + throw std::runtime_error(SB()<<'"'<(comma+1); + + } else if(comma) { + ttl = parseTo(std::string(comma+1, at-comma-1)); + } + + if(at) + iface = at+1; + } + + auto& ifmap = IfaceMap::instance(); + + if(addr.family()==AF_INET6) { + if(iface.empty() && addr->in6.sin6_scope_id) { + // interface index provide with IPv6 address + // we map back to symbolic name for storage + iface = ifmap.name_of(addr->in6.sin6_scope_id); + } + addr->in6.sin6_scope_id = 0; + + } else if(addr.family()==AF_INET && addr.isMCast() && !iface.empty()) { + SockAddr ifaddr(AF_INET); + + if(evutil_inet_pton(AF_INET, iface.c_str(), &ifaddr->in.sin_addr.s_addr)==1) { + // map interface address to symbolic name + + iface = ifmap.name_of(ifaddr); + } + } + + if(!iface.empty() && !ifmap.index_of(iface)) { + log_warn_printf(config, "Invalid interface address or name: \"%s\"\n", iface.c_str()); + } +} + +MCastMembership SockEndpoint::resolve() const +{ + if(!addr.isMCast()) + throw std::logic_error("not mcast"); + + auto& ifmap = IfaceMap::instance(); + + MCastMembership m; + m.af = addr.family(); + if(m.af==AF_INET) { + auto& req = m.req.in; + req.imr_multiaddr.s_addr = addr->in.sin_addr.s_addr; + + if(!iface.empty()) { + auto iface = ifmap.address_of(this->iface); + if(iface.family()==AF_INET) { + req.imr_interface.s_addr = iface->in.sin_addr.s_addr; + } + } + + } else if(m.af==AF_INET6) { + auto& req = m.req.in6; + req.ipv6mr_multiaddr = addr->in6.sin6_addr; + + if(!iface.empty()) { + req.ipv6mr_interface = ifmap.index_of(this->iface); + if(!req.ipv6mr_interface) { + log_warn_printf(config, "Unable to resolve interface '%s'\n", iface.c_str()); + } + } + + } else { + throw std::logic_error("Unsupported address family"); + } + return m; +} + +std::ostream& operator<<(std::ostream& strm, const SockEndpoint& addr) +{ + strm<& out, const std: pos = end; if(start>24)&0xff)<<'.'<<((ip>>16)&0xff)<<'.'<<((ip>>8)&0xff)<<'.'<<((ip>>0)&0xff); - if(addr.sin_port) - strm<<':'< parseAddresses(const std::vector& addrs, uint16_t defport=0) +{ + std::vector ret; + for(const auto& addr : addrs) { + try { + ret.emplace_back(addr, defport); + }catch(std::runtime_error& e){ + log_warn_printf(config, "Ignoring %s : %s\n", addr.c_str(), e.what()); + continue; + } + } + return ret; +} + +void printAddresses(std::vector& out, std::vector& inp) +{ + std::vector temp; + temp.reserve(inp.size()); + + for(auto& addr : inp) { + temp.emplace_back(SB()<& ifaces, - std::vector& addrs) +void expandAddrList(const std::vector& ifaces, + std::vector& addrs) { SockAttach attach; evsocket dummy(AF_INET, SOCK_DGRAM, 0); - std::vector bcasts; - - for(auto& addr : ifaces) { - SockAddr saddr(AF_INET); - try { - saddr.setAddress(addr.c_str()); - }catch(std::runtime_error& e){ - log_warn_printf(config, "%s Ignoring...\n", e.what()); + for(auto& saddr : ifaces) { + if(saddr.addr.family()!=AF_INET) continue; - } - for(auto& addr : dummy.broadcasts(&saddr)) { + for(auto& addr : dummy.broadcasts(&saddr.addr)) { addr.setPort(0u); - bcasts.push_back(addr.tostring()); + addrs.emplace_back(addr); } } - - addrs.reserve(addrs.size()+bcasts.size()); - for(auto& bcast : bcasts) { - addrs.push_back(std::move(bcast)); - } } -void removeDups(std::vector& addrs) +void addGroups(std::vector& ifaces, + const std::vector& addrs) { + auto& ifmap = IfaceMap::instance(); + std::set allifaces; + + for(const auto& addr : addrs) { + if(!addr.addr.isMCast()) + continue; + + if(!addr.iface.empty()) { + // interface already specified + ifaces.push_back(addr); + + } else { + // no interface specified, treat as wildcard + if(allifaces.empty()) + allifaces = ifmap.all_external(); + + for(auto& iface : allifaces) { + auto ifaceaddr(addr); + ifaceaddr.iface = iface; + ifaces.push_back(ifaceaddr); + } + } + } +} + +// remove duplicates while preserving order of first appearance +template +void removeDups(std::vector& addrs) +{ + std::sort(addrs.begin(), addrs.end()); addrs.erase(std::unique(addrs.begin(), addrs.end()), addrs.end()); } +// special handling for SockEndpoint where duplication is based on +// address,interface. Duplicates are combined with the longest TTL. +template<> +void removeDups(std::vector& addrs) +{ + std::map, size_t> seen; + for(size_t i=0; isecond]; + + if(ep.ttl > orig.ttl) { // w/ longer TTL + orig.ttl = ep.ttl; + } + + addrs.erase(addrs.begin()+i); + // 'ep' and 'orig' are invalidated + } + } +} + void enforceTimeout(double& tmo) { /* Inactivity timeouts with PVA have a long (and growing) history. @@ -287,22 +465,35 @@ void Config::updateDefs(defs_t& defs) const void Config::expand() { + if(tcp_port==0) + tcp_port = 5075; + + auto ifaces(parseAddresses(interfaces)); + auto bdest(parseAddresses(beaconDestinations)); + // empty interface address list implies the wildcard // (because no addresses isn't interesting...) - if(interfaces.empty()) { - interfaces.emplace_back("0.0.0.0"); + if(ifaces.empty()) { + ifaces.emplace_back(SockAddr::any(AF_INET)); } if(auto_beacon) { - expandAddrList(interfaces, beaconDestinations); + // use interface list add ipv4 broadcast addresses to beaconDestinations. + // 0.0.0.0 -> adds all bcasts + // otherwise add bcast for each iface address + expandAddrList(ifaces, bdest); + addGroups(ifaces, bdest); auto_beacon = false; } - removeDups(interfaces); - removeDups(beaconDestinations); + removeDups(ifaces); + printAddresses(interfaces, ifaces); + removeDups(bdest); + printAddresses(beaconDestinations, bdest); removeDups(ignoreAddrs); enforceTimeout(tcpTimeout); + } std::ostream& operator<<(std::ostream& strm, const Config& conf) @@ -419,15 +610,21 @@ void Config::expand() if(tcp_port==0) tcp_port = 5075; - if(interfaces.empty()) - interfaces.emplace_back("0.0.0.0"); + auto ifaces(parseAddresses(interfaces)); + auto addrs(parseAddresses(addressList)); + + if(ifaces.empty()) + ifaces.emplace_back(SockAddr::any(AF_INET)); if(autoAddrList) { - expandAddrList(interfaces, addressList); + expandAddrList(ifaces, addrs); + addGroups(ifaces, addrs); autoAddrList = false; } - removeDups(addressList); + printAddresses(interfaces, ifaces); + removeDups(addrs); + printAddresses(addressList, addrs); enforceTimeout(tcpTimeout); } diff --git a/src/describe.cpp b/src/describe.cpp index 2fe713a..91b30fd 100644 --- a/src/describe.cpp +++ b/src/describe.cpp @@ -103,7 +103,7 @@ std::ostream& target_information(std::ostream& strm) #endif auto localaddr(osiLocalAddr(dummy.sock)); - strm< "< "<\n"; Indented J(strm); diff --git a/src/evhelper.cpp b/src/evhelper.cpp index f7a19ac..0088305 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -44,6 +44,7 @@ namespace pvxs {namespace impl { DEFINE_LOGGER(logerr, "pvxs.loop"); DEFINE_LOGGER(logtimer, "pvxs.timer"); DEFINE_LOGGER(logiface, "pvxs.iface"); +DEFINE_LOGGER(logsock, "pvxs.sock"); namespace mdetail { VFunctor0::~VFunctor0() {} @@ -364,18 +365,22 @@ bool evbase::assertInRunningLoop() const throw std::logic_error("Not in running evbase worker"); } -evsocket::evsocket(evutil_socket_t sock) +evsocket::evsocket(int af, evutil_socket_t sock) :sock(sock) + ,af(af) { if(sock==evutil_socket_t(-1)) { - int err = SOCKERRNO; + int err = evutil_socket_geterror(sock); #ifdef _WIN32 if(err==WSANOTINITIALISED) { throw std::runtime_error("WSANOTINITIALISED"); } #endif - (void)err; - throw std::runtime_error("Unable to allocate socket"); + throw std::system_error(err, std::system_category()); + } + if(af!=AF_INET && af!=AF_INET6) { + evutil_closesocket(sock); + throw std::logic_error("Unsupported address family"); } evutil_make_socket_closeonexec(sock); @@ -392,7 +397,7 @@ evsocket::evsocket(evutil_socket_t sock) #endif evsocket::evsocket(int af, int type, int proto) - :evsocket(socket(af, type | SOCK_CLOEXEC, proto)) + :evsocket(af, socket(af, type | SOCK_CLOEXEC, proto)) { #ifdef __linux__ # ifndef IP_MULTICAST_ALL @@ -411,8 +416,10 @@ evsocket::evsocket(int af, int type, int proto) evsocket::evsocket(evsocket&& o) noexcept :sock(o.sock) + ,af(o.af) { o.sock = evutil_socket_t(-1); + o.af = AF_UNSPEC; } evsocket& evsocket::operator=(evsocket&& o) noexcept @@ -421,7 +428,9 @@ evsocket& evsocket::operator=(evsocket&& o) noexcept if(sock!=evutil_socket_t(-1)) evutil_closesocket(sock); sock = o.sock; + af = o.af; o.sock = evutil_socket_t(-1); + o.af = AF_UNSPEC; } return *this; } @@ -446,56 +455,124 @@ void evsocket::bind(SockAddr& addr) const log_err_printf(logerr, "Unable to fetch address of newly bound socket\n%s", ""); } -void evsocket::mcast_join(const SockAddr& grp, const SockAddr& iface) const +void evsocket::set_broadcast(bool b) const { - if(grp.family()!=iface.family() || grp.family()!=AF_INET) - throw std::invalid_argument("Unsupported address family"); - - ip_mreq req{}; - req.imr_multiaddr.s_addr = grp->in.sin_addr.s_addr; - req.imr_interface.s_addr = iface->in.sin_addr.s_addr; - - int ret = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&req, sizeof(req)); - if(ret) - log_err_printf(logerr, "Unable to join mcast group %s on %s : %s\n", - grp.tostring().c_str(), iface.tostring().c_str(), - evutil_socket_error_to_string(evutil_socket_geterror(sock))); - - // IPV6_ADD_MEMBERSHIP + int val = b; + if(setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val))) + log_err_printf(logerr, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO); } -void evsocket::mcast_ttl(unsigned ttl) const -{ - int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ttl, sizeof(ttl)); - if(ret) - log_err_printf(logerr, "Unable to set mcast TTL : %s\n", - evutil_socket_error_to_string(evutil_socket_geterror(sock))); +#ifndef IPV6_ADD_MEMBERSHIP +# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP +#endif +#ifndef IPV6_DROP_MEMBERSHIP +# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP +#endif - // ipv6 variant? +bool evsocket::mcast_join(const MCastMembership& m) const +{ + if(m.af==AF_INET) { + auto& req = m.req.in; + + if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&req, sizeof(req))) { + log_err_printf(logerr, "Unable to join mcast4 group: %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + return false; + } + + } else if(m.af==AF_INET6) { + auto& req = m.req.in6; + + if(setsockopt(sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (char*)&req, sizeof(req))) { + log_err_printf(logerr, "Unable to join mcast6 group : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + return false; + } + } + return true; +} + +void evsocket::mcast_leave(const MCastMembership &m) const +{ + if(m.af==AF_INET) { + if(setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char*)&m.req.in, sizeof(m.req.in))) + log_err_printf(logerr, "Unable to leave mcast4 group: %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + + } else if(m.af==AF_INET6) { + if(setsockopt(sock, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (char*)&m.req.in6, sizeof(m.req.in6))) + log_err_printf(logerr, "Unable to leave mcast6 group: %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + } +} + +void evsocket::mcast_prep_sendto(const SockEndpoint& ep) const +{ + if(ep.addr.family()!=af) + throw std::logic_error("Inconsistent address family or not mcast"); + + else if(!ep.addr.isMCast()) + return; + + auto& ifmap = IfaceMap::instance(); + + if(af==AF_INET) { + SockAddr iface(AF_INET); + if(!ep.iface.empty()) + iface = ifmap.address_of(ep.iface); + + if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ep.ttl, sizeof(ep.ttl))) + log_err_printf(logerr, "Unable to set mcast TTL : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + + if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&iface->in.sin_addr, sizeof(iface->in.sin_addr))) + log_err_printf(logerr, "Unable to set mcast IF : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + + } else if(af==AF_INET6) { + int index = 0u; + if(!ep.iface.empty()) + index = ifmap.index_of(ep.iface); + + if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (char*)&ep.ttl, sizeof(ep.ttl))) + log_err_printf(logerr, "Unable to set mcast TTL : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + + if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, (char*)&index, sizeof(index))) + log_err_printf(logerr, "Unable to set mcast IF : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + } } void evsocket::mcast_loop(bool loop) const { - unsigned char val = loop ? 1 : 0; - int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char*)&val, sizeof(val)); - if(ret) - log_err_printf(logerr, "Unable to set mcast loopback : %s\n", - evutil_socket_error_to_string(evutil_socket_geterror(sock))); + /* On Linux (at least) IP_MULTICAST_LOOP is not exactly equivalent to IPV6_MULTICAST_LOOP, + * and we are allowed to set both. So we do... + */ + if(af==AF_INET || af==AF_INET6) { + unsigned char val = loop ? 1 : 0; + if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char*)&val, sizeof(val))) + log_err_printf(logerr, "Unable to set mcast loopback4 : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); - // IPV6_MULTICAST_LOOP + } + if(af==AF_INET6) { + unsigned int val = loop ? 1 : 0; + if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, (char*)&val, sizeof(val))) + log_err_printf(logerr, "Unable to set mcast loopback6 : %s\n", + evutil_socket_error_to_string(evutil_socket_geterror(sock))); + } } -void evsocket::mcast_iface(const SockAddr& iface) const +void evsocket::ipv6_only(bool b) const { - if(iface.family()!=AF_INET) + if(af!=AF_INET6) throw std::invalid_argument("Unsupported address family"); - int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&iface->in.sin_addr, sizeof(iface->in.sin_addr)); - if(ret) - log_err_printf(logerr, "Unable to set mcast TTL : %s\n", + int v=b; + if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&v, sizeof(v))) + log_err_printf(logerr, "Unable to set IPv6 only : %s\n", evutil_socket_error_to_string(evutil_socket_geterror(sock))); - - // IPV6_MULTICAST_IF } std::vector evsocket::broadcasts(const SockAddr* match) const @@ -503,6 +580,11 @@ std::vector evsocket::broadcasts(const SockAddr* match) const if(match && match->family()!=AF_INET) { throw std::logic_error("osiSockDiscoverBroadcastAddresses() only understands AF_INET"); } + + std::vector ret; + if(af==AF_INET6) + return ret; // IPv6 does not have broadcast addresses + evsocket dummy(AF_INET, SOCK_DGRAM, 0); osiSockAddr realmatch; @@ -517,7 +599,6 @@ std::vector evsocket::broadcasts(const SockAddr* match) const ELLLIST bcasts = ELLLIST_INIT; osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, &realmatch); - std::vector ret; ret.reserve(ellCount(&bcasts)); while(ellCount(&bcasts)) { @@ -525,55 +606,224 @@ std::vector evsocket::broadcasts(const SockAddr* match) const ellDelete(&bcasts, cur); osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node); if(node->addr.sa.sa_family==AF_INET) - ret.emplace_back(&node->addr.sa, sizeof(node->addr)); + ret.emplace_back(&node->addr.sa); free(node); } return ret; } +#if defined(_WIN32) && !defined(EAFNOSUPPORT) +# define EAFNOSUPPORT WSAESOCKTNOSUPPORT +#endif + +bool evsocket::canIPv6() +{ + auto sock = socket(AF_INET6, SOCK_DGRAM, 0); + if(sock!=evutil_socket_t(-1)) { + evutil_closesocket(sock); + return true; + } + int err = evutil_socket_geterror(sock); + if(err!=EAFNOSUPPORT) { + log_warn_printf(logsock, "Unexpected errno %d while probing IPv6: %s\n", + err, evutil_socket_error_to_string(err)); + } + return false; +} + #if EPICS_VERSION_INT +bool try_cache(IfaceMap& self, FN&& fn) +{ + bool force = false; +retry: + self.refresh(force); + bool found = fn(); + if(!found && !force) { + force = true; + goto retry; + } + return found; +} + +} // namespace + +bool IfaceMap::has_address(uint64_t ifindex, const SockAddr &addr) +{ + Guard G(lock); + if(addr.isAny()) return true; - auto now(epicsTime::getMonotonic()); - auto age = now-updated; - bool first = true; - -retry: - if(!first || age > 60) { - refresh(); - updated = now; - } else { - log_debug_printf(logiface, "using cache age %.2f sec\n", age); - } - - auto ifit(info.find(ifindex)); - if(ifit!=info.end()) { - const auto& iface = ifit->second; - auto adit(iface.find(addr)); - return adit!=iface.end(); - } - if(first) { - // re-try once - first = false; - goto retry; - } - log_warn_printf(logiface, "Encountered unknown interface index %lld\n", (long long)ifindex); - return false; + bool found = try_cache(*this, [this, ifindex, &addr]() { + auto ifit(byIndex.find(ifindex)); + if(ifit!=byIndex.end()) { + const auto& addrs = ifit->second.addrs; + return addrs.find(addr)!=addrs.end(); + } + return false; + }); + return found; } +std::string IfaceMap::name_of(uint64_t index) +{ + Guard G(lock); + + std::string name; + bool found = try_cache(*this, [this, index, &name](){ + auto it(byIndex.find(index)); + if(it!=byIndex.end()) { + name = it->second.name; + return true; + } + return false; + }); + if(!found) { + // fallback to numeric index + name = SB()<second.first->name; + return true; + } + return false; + }); + return name; +} + +uint64_t IfaceMap::index_of(const std::string& name) +{ + Guard G(lock); + + uint64_t ret = 0u; + try_cache(*this, [&ret, this, name]() { + auto it = byName.find(name); + bool hit = it!=byName.end(); + if(hit) + ret = it->second->index; + return hit; + }); + return ret; +} + +bool IfaceMap::is_address(const SockAddr& addr) +{ + Guard G(lock); + + return try_cache(*this, [this, addr]() { + return byAddr.find(addr)!=byAddr.end(); + }); +} + +bool IfaceMap::is_broadcast(const SockAddr& addr) +{ + Guard G(lock); + + return try_cache(*this, [this, addr]() { + auto it(byAddr.find(addr)); + return it!=byAddr.end() && it->second.second; + }); +} + +SockAddr IfaceMap::address_of(const std::string& name) +{ + Guard G(lock); + + SockAddr ret; + try_cache(*this, [this, name, &ret]() { + auto it(byName.find(name)); + if(it!=byName.end() && !it->second->addrs.empty()) { + ret = it->second->addrs.begin()->first; + } + return false; + }); + return ret; +} + +std::set IfaceMap::all_external() +{ + std::set ret; + Guard G(lock); + refresh(); + for(auto& pair : byIndex) { + ret.emplace(pair.second.name); + } + return ret; +} + + void to_wire(Buffer& buf, const SockAddr& val) { if(!buf.ensure(16)) { diff --git a/src/evhelper.h b/src/evhelper.h index adae254..9d98a0f 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -193,12 +193,13 @@ void from_wire(Buffer &buf, SockAddr& val); struct PVXS_API evsocket { evutil_socket_t sock; + int af; // default construct an invalid socket - constexpr evsocket() noexcept :sock(-1) {} + constexpr evsocket() noexcept :sock(-1), af(AF_UNSPEC) {} // construct from a valid (not -1) socket - explicit evsocket(evutil_socket_t sock); + explicit evsocket(int af, evutil_socket_t sock); // create a new socket evsocket(int, int, int); @@ -217,33 +218,80 @@ struct PVXS_API evsocket inline operator bool() const { return sock!=-1; } void bind(SockAddr& addr) const; - //! join mcast group. Receive mcasts send to this group which arrive on the given interface - //! @see IP_ADD_MEMBERSHIP - void mcast_join(const SockAddr& grp, const SockAddr& iface) const; - //! Set time-to-live out mcasts sent from this socket - //! @see IP_MULTICAST_TTL - void mcast_ttl(unsigned ttl) const; + + void set_broadcast(bool b) const; + + //! Join multicast group, optionally on selected interface + bool mcast_join(const MCastMembership& m) const; + //! Reverse previous join + void mcast_leave(const MCastMembership& m) const; + //! Prepare socket for subsequent sendto() with TTL and output interface + void mcast_prep_sendto(const SockEndpoint& ep) const; + //! Whether mcasts sent from this socket should be received to local listeners - //! @see IP_MULTICAST_LOOP + //! @see IP_MULTICAST_LOOP and IPV6_MULTICAST_LOOP void mcast_loop(bool loop) const; - //! Selects interface to use when sending mcasts - //! @see IP_MULTICAST_IF - void mcast_iface(const SockAddr& iface) const; + //! Disable IPv4 through IPv6 socket + void ipv6_only(bool b=true) const; + + //! Linux specific include OS dropped packet counter as cmsg + void enable_SO_RXQ_OVFL() const; + + void enable_IP_PKTINFO() const; //! wraps osiSockDiscoverBroadcastAddresses() std::vector broadcasts(const SockAddr* match=nullptr) const; + + static + bool canIPv6(); }; struct PVXS_API IfaceMap { + static + IfaceMap& instance(); + static + void cleanup(); + IfaceMap(); - // return true if ifindex is valid, and addr is one of the addresses currently assigned to it. - bool has_address(int64_t ifindex, const SockAddr& addr); + // return true if ifindex is valid, and addr an interface address assigned to it. + bool has_address(uint64_t ifindex, const SockAddr& addr); + // lookup interface name by index + std::string name_of(uint64_t index); + // find (an) interface name with this address. useful for IPv4. returns empty string if not found. + std::string name_of(const SockAddr& addr); + // returns 0 if not found + uint64_t index_of(const std::string& name); + // is this a valid interface or broadcast address? + bool is_address(const SockAddr& addr); + // is this a valid interface or broadcast address? + bool is_broadcast(const SockAddr& addr); + // look up interface address. useful for IPV4. returns AF_UNSPEC if not found + SockAddr address_of(const std::string& name); + // all interface names except LO + std::set all_external(); - void refresh(); + // caller must hold lock + void refresh(bool force=false); - std::map> info; + struct Iface { + std::string name; + uint64_t index; + bool isLO; + // interface address(s) -> (maybe) broadcast addr + std::map addrs; + Iface(const std::string& name, uint64_t index, bool isLO) :name(name), index(index), isLO(isLO) {} + }; + + epicsMutex lock; + std::map byIndex; + std::map byName; + // map address to tuple of interface and broadcast? + std::multimap, SockAddrOnlyLess> byAddr; epicsTime updated; +private: + static + decltype (byIndex) _refresh(); }; } // namespace impl diff --git a/src/os/WIN32/osdSockExt.cpp b/src/os/WIN32/osdSockExt.cpp index 33d9e82..e0d59a5 100644 --- a/src/os/WIN32/osdSockExt.cpp +++ b/src/os/WIN32/osdSockExt.cpp @@ -54,13 +54,20 @@ void osiSockAttachExt() epicsThreadOnce(&oseOnce, &oseDoOnce, nullptr); } -void enable_SO_RXQ_OVFL(SOCKET sock) {} +void evsocket::enable_SO_RXQ_OVFL() const {} -void enable_IP_PKTINFO(SOCKET sock) +void evsocket::enable_IP_PKTINFO() const { - int val = 1; - if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) - log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); + if(af==AF_INET) { + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); + + } else if(af==AF_INET6) { + int val = 1; + if(setsockopt(sock, IPPROTO_IPV6, IPV6_PKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IPV6_RECVPKTINFO: %d\n", SOCKERRNO); + } } int recvfromx::call() @@ -77,7 +84,8 @@ int recvfromx::call() msg.name = &(*src)->sa; msg.namelen = src->size(); - alignas (alignof (WSACMSGHDR)) char cbuf[WSA_CMSG_SPACE(sizeof(in_pktinfo))]; + // will receive either in6_pktinfo or in_pktinfo, not both. in6_pktinfo is larger + alignas (WSACMSGHDR) char cbuf[WSA_CMSG_SPACE(sizeof(in6_pktinfo))]; msg.Control = {sizeof(cbuf), cbuf}; DWORD nrx=0u; @@ -96,6 +104,16 @@ int recvfromx::call() memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx)); dstif = idx; } + else if(hdr->cmsg_level==IPPROTO_IPV6 && hdr->cmsg_type==IPV6_PKTINFO && hdr->cmsg_len>=WSA_CMSG_LEN(sizeof(in6_pktinfo))) { + if(dst) { + (*dst)->in6.sin6_family = AF_INET6; + memcpy(&(*dst)->in6.sin6_addr, WSA_CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_addr), sizeof(in6_addr)); + } + + decltype(in6_pktinfo::ipi6_ifindex) idx; + memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_ifindex), sizeof(idx)); + dstif = idx; + } } return nrx; @@ -111,12 +129,16 @@ namespace impl { # define GAA_FLAG_INCLUDE_ALL_INTERFACES 0 #endif -void IfaceMap::refresh() { +decltype (IfaceMap::byIndex) IfaceMap::_refresh() { std::vector ifaces(1024u); - decltype (info) temp; + decltype (byIndex) temp; { - constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST|GAA_FLAG_SKIP_MULTICAST|GAA_FLAG_SKIP_DNS_SERVER|GAA_FLAG_INCLUDE_ALL_INTERFACES; + constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST + |GAA_FLAG_SKIP_MULTICAST + |GAA_FLAG_SKIP_DNS_SERVER + |GAA_FLAG_INCLUDE_PREFIX + |GAA_FLAG_INCLUDE_ALL_INTERFACES; ULONG buflen = ifaces.size(); auto err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast(ifaces.data()), &buflen); @@ -130,28 +152,73 @@ void IfaceMap::refresh() { if(err) { log_warn_printf(logiface, "Unable to GetAdaptersAddresses() error=%lld\n", (unsigned long long)err); - return; + return temp; } } for(auto iface = reinterpret_cast(ifaces.data()); iface ; iface = iface->Next) { - auto& info = temp[iface->IfIndex]; - //TODO: any flags to check? + if(!(iface->OperStatus & IfOperStatusUp)) + continue; // not configured, skip... - for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) { + // TODO: IfIndex vs. Ipv6IfIndex which one to use? - if(addr->Address.lpSockaddr->sa_family!=AF_INET) + bool isLO = iface->IfType==IF_TYPE_SOFTWARE_LOOPBACK; + auto pair = temp.emplace(std::piecewise_construct, + std::forward_as_tuple(iface->IfIndex), + std::forward_as_tuple(iface->AdapterName, iface->IfIndex, isLO)); + + auto& info = pair.first->second; + + // find the IPv4 broadcast address, if any + std::set> prefixes; + for(auto prefix = iface->FirstPrefix; prefix; prefix = prefix->Next) { + SockAddr addr(prefix->Address.lpSockaddr); + auto p = prefix->PrefixLength; + + if(addr.family()!=AF_INET || p<=0u || p>=32u) continue; - auto pair = info.emplace(addr->Address.lpSockaddr, sizeof(sockaddr_in)); + sockaddr_in mask{AF_INET}; + mask.sin_addr.s_addr = htonl(0xffffffff<<(32u-p)); + auto pair = prefixes.emplace(addr, (sockaddr*)&mask); - log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n", - (long long)iface->IfIndex, iface->AdapterName, pair.first->tostring().c_str()); + log_debug_printf(logiface, "Prefix %s/%s\n", addr.tostring().c_str(), pair.first->second.tostring().c_str()); + } + + for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) { + const auto af = addr->Address.lpSockaddr->sa_family; + if(af!=AF_INET && af!=AF_INET6) { + log_debug_printf(logiface, "Ignoring interface '%s' address family=%d\n", + iface->AdapterName, af); + continue; + } + + SockAddr iaddr(addr->Address.lpSockaddr); + SockAddr bcast; + if(iaddr.family()==AF_INET && !isLO) { + auto A = ntohl(iaddr->in.sin_addr.s_addr); + for(auto& pair : prefixes) { + auto P = ntohl(pair.first->in.sin_addr.s_addr); + auto M = ntohl(pair.second->in.sin_addr.s_addr); + if((A&M)==P) { + auto B = P | ~M; + bcast->in.sin_family = AF_INET; + bcast->in.sin_addr.s_addr = htonl(B); + } + } + } + + auto pair = info.addrs.emplace(iaddr, bcast); + + log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s/%s\n", + (long long)iface->IfIndex, iface->AdapterName, + pair.first->first.tostring().c_str(), + pair.first->second.tostring().c_str()); } } - info.swap(temp); + return temp; } } // namespace impl diff --git a/src/os/default/osdSockExt.cpp b/src/os/default/osdSockExt.cpp index c812777..d2c1216 100644 --- a/src/os/default/osdSockExt.cpp +++ b/src/os/default/osdSockExt.cpp @@ -4,6 +4,11 @@ * in file LICENSE that is included with this distribution. */ +#ifdef __APPLE__ +// expose IPV6_PKTINFO +# define __APPLE_USE_RFC_3542 +#endif + #include "osiSockExt.h" #include @@ -19,6 +24,11 @@ extern "C" { } #endif +// some *BSD (OSX but not RTEMS5/libbsd) use IPV6_PKTINFO to enable RX +#if defined(IPV6_PKTINFO) && !defined(IPV6_RECVPKTINFO) +# define IPV6_RECVPKTINFO IPV6_PKTINFO +#endif + #include #include @@ -29,7 +39,7 @@ DEFINE_LOGGER(logiface, "pvxs.iface"); void osiSockAttachExt() {} -void enable_SO_RXQ_OVFL(SOCKET sock) +void evsocket::enable_SO_RXQ_OVFL() const { #ifdef SO_RXQ_OVFL // Linux specific feature exposes OS dropped packet count @@ -40,34 +50,42 @@ void enable_SO_RXQ_OVFL(SOCKET sock) #endif } -void enable_IP_PKTINFO(SOCKET sock) +void evsocket::enable_IP_PKTINFO() const { - /* linux, some *BSD's (OSX), and winsock package both destination address (from ip header) + if(af==AF_INET) { + + /* linux, some *BSD's (OSX), and winsock package both destination address (from ip header) * and receiving interface index (from host) into one IP_PKTINFO control message. * Remaining *BSD's can deliver these in separate IP_ORIGDSTADDR and IP_RECVIF messages. */ #ifdef IP_PKTINFO - int val = 1; - if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) - log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO); #else # ifdef IP_ORIGDSTADDR - { - int val = 1; - if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val))) - log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO); - } + { + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO); + } # endif # ifdef IP_RECVIF - { - int val = 1; - if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val))) - log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO); - } + { + int val = 1; + if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO); + } # endif #endif + + } else if(af==AF_INET6) { + int val = 1; + if(setsockopt(sock, IPPROTO_IPV6, IPV6_RECVPKTINFO, (char*)&val, sizeof(val))) + log_warn_printf(log, "Unable to set IPV6_PKTINFO reception: %d\n", SOCKERRNO); + } } int recvfromx::call() @@ -81,10 +99,12 @@ int recvfromx::call() msg.msg_name = &(*src)->sa; msg.msg_namelen = src ? src->size() : 0u; - alignas (alignof (cmsghdr)) char cbuf[0u + alignas (cmsghdr) char cbuf[0u #ifdef SO_RXQ_OVFL + CMSG_SPACE(sizeof(ndrop)) #endif + // only need space for IPv4 option(s) or IPv6 option, never both. + + impl::cmax(0 #ifdef IP_PKTINFO + CMSG_SPACE(sizeof(in_pktinfo)) #else @@ -95,6 +115,9 @@ int recvfromx::call() + CMSG_SPACE(sizeof(sockaddr_dl)) # endif #endif + ,0 + + CMSG_SPACE(sizeof(in6_pktinfo)) + ) // cmax ]; msg.msg_control = cbuf; msg.msg_controllen = sizeof(cbuf); @@ -143,6 +166,16 @@ int recvfromx::call() } # endif #endif + else if(hdr->cmsg_level==IPPROTO_IPV6 && hdr->cmsg_type==IPV6_PKTINFO && hdr->cmsg_len>=CMSG_LEN(sizeof(in6_pktinfo))) { + if(dst) { + (*dst)->in6.sin6_family = AF_INET6; + memcpy(&(*dst)->in6.sin6_addr, CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_addr), sizeof(in6_addr)); + } + + decltype(in6_pktinfo::ipi6_ifindex) idx; + memcpy(&idx, CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_ifindex), sizeof(idx)); + dstif = idx; + } } } @@ -151,20 +184,22 @@ int recvfromx::call() namespace impl { -void IfaceMap::refresh() { +decltype (IfaceMap::byIndex) IfaceMap::_refresh() { ifaddrs* addrs = nullptr; - decltype (info) temp; + decltype (byIndex) temp; if(getifaddrs(&addrs)) { log_warn_printf(logiface, "Unable to getifaddrs() errno=%d\n", errno); - return; + return temp; } try { for(const ifaddrs* ifa = addrs; ifa; ifa = ifa->ifa_next) { - if(ifa->ifa_addr->sa_family!=AF_INET) { - log_debug_printf(logiface, "Ignoring interface '%s' address !ipv4\n", ifa->ifa_name); + const auto af = ifa->ifa_addr->sa_family; + if((af!=AF_INET && af!=AF_INET6) || ifa->ifa_name[0]=='\0') { + log_debug_printf(logiface, "Ignoring interface '%s' address family=%d\n", + ifa->ifa_name, af); continue; } @@ -174,12 +209,28 @@ void IfaceMap::refresh() { continue; } - //TODO: any flags to check? + if(!(ifa->ifa_flags&IFF_UP)) + continue; // not configured, skip... - auto pair = temp[idx].emplace(ifa->ifa_addr, sizeof(sockaddr_in)); + auto it = temp.find(idx); + if(it==temp.end()) { + // encountering new index + bool isLO = ifa->ifa_flags&IFF_LOOPBACK; + auto pair = temp.emplace(std::piecewise_construct, + std::forward_as_tuple(idx), + std::forward_as_tuple(ifa->ifa_name, idx, isLO)); + assert(pair.second); + it = pair.first; + } - log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n", - (long long)idx, ifa->ifa_name, pair.first->tostring().c_str()); + // IFF_BROADCAST does not apply to IPv6 + bool hasB = ifa->ifa_addr->sa_family==AF_INET && (ifa->ifa_flags&IFF_BROADCAST); + + auto pair = it->second.addrs.emplace(SockAddr(ifa->ifa_addr), + SockAddr(hasB ? ifa->ifa_broadaddr : nullptr)); + + log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %d %s\n", + (long long)idx, ifa->ifa_name, af, pair.first->first.tostring().c_str()); } } catch(...){ @@ -188,7 +239,7 @@ void IfaceMap::refresh() { } freeifaddrs(addrs); - info.swap(temp); + return temp; } } // namespace impl diff --git a/src/osiSockExt.h b/src/osiSockExt.h index b39b3d0..a9ed34a 100644 --- a/src/osiSockExt.h +++ b/src/osiSockExt.h @@ -10,11 +10,21 @@ #include #include +#include #include #include +// added with Base 3.15 +#ifndef SOCK_EADDRNOTAVAIL +# ifdef _WIN32 +# define SOCK_EADDRNOTAVAIL WSAEADDRNOTAVAIL +# else +# define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL +# endif +#endif + namespace pvxs { PVXS_API @@ -39,16 +49,16 @@ private: public: explicit SockAddr(int af = AF_UNSPEC); - explicit SockAddr(int af, const char *address, unsigned short port=0); - explicit SockAddr(const sockaddr *addr, ev_socklen_t len); - inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {} + explicit SockAddr(const char *address, unsigned short port=0); + explicit SockAddr(const sockaddr *addr); + inline explicit SockAddr(const std::string& address, unsigned short port=0) :SockAddr(address.c_str(), port) {} - size_t size() const; + size_t size() const noexcept; inline size_t capacity() const { return sizeof(store); } - inline unsigned short family() const { return store.sa.sa_family; } - unsigned short port() const; + inline unsigned short family() const noexcept { return store.sa.sa_family; } + unsigned short port() const noexcept; void setPort(unsigned short port); SockAddr withPort(unsigned short port) const { SockAddr temp(*this); @@ -57,10 +67,15 @@ public: } void setAddress(const char *, unsigned short port=0); + inline void setAddress(const std::string& s, unsigned short port=0) { + setAddress(s.c_str(), port); + } - bool isAny() const; - bool isLO() const; - bool isMCast() const; + bool isAny() const noexcept; + bool isLO() const noexcept; + bool isMCast() const noexcept; + + SockAddr map4to6() const; store_t* operator->() { return &store; } const store_t* operator->() const { return &store; } @@ -95,11 +110,53 @@ struct SockAddrOnlyLess { PVXS_API std::ostream& operator<<(std::ostream& strm, const SockAddr& addr); -// Linux specific include OS dropped packet counter as cmsg -void enable_SO_RXQ_OVFL(SOCKET sock); -// Include destination address as cmsg +// resolved multicast group membership request +struct MCastMembership { + int af = AF_UNSPEC; + union { + ip_mreq in; + ipv6_mreq in6; + } req{}; + bool operator<(const MCastMembership& o) const { + if(af==o.af) { + if(af==AF_INET) + return memcmp(&req.in, &o.req.in, sizeof(o.req.in)); + else + return memcmp(&req.in6, &o.req.in6, sizeof(o.req.in6)); + } + return af + * , + * @iface + * ,@iface + */ +struct PVXS_API SockEndpoint { + SockAddr addr; // ucast, mcast, or bcast + // if mcast, then output TTL and interface + int ttl=-1; + std::string iface; + + SockEndpoint() = default; + SockEndpoint(const char* ep, uint16_t defport=0); + SockEndpoint(const std::string& ep, uint16_t defport=0) :SockEndpoint(ep.c_str(), defport) {} + explicit SockEndpoint(const SockAddr& addr) :addr(addr) {} + + MCastMembership resolve() const; +}; + PVXS_API -void enable_IP_PKTINFO(SOCKET sock); +std::ostream& operator<<(std::ostream& strm, const SockEndpoint& addr); + +PVXS_API +bool operator==(const SockEndpoint& lhs, const SockEndpoint& rhs); + +inline +bool operator!=(const SockEndpoint& lhs, const SockEndpoint& rhs) { return !(lhs==rhs); } struct recvfromx { evutil_socket_t sock; diff --git a/src/pvaproto.h b/src/pvaproto.h index db0afa7..17f0dcc 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -589,8 +589,13 @@ enum class pva_subcmd { }; struct Header { - uint8_t cmd, flags; - uint32_t len; + uint8_t cmd=0u, flags=0u, version=0u; + uint32_t len=0u; + constexpr Header() {} + explicit + constexpr Header(uint8_t cmd, uint8_t flags, uint32_t len) + :cmd(cmd), flags(flags), version(0u), len(len) + {} }; template @@ -622,6 +627,7 @@ void from_wire(Buf& buf, Header& H) } else { H.cmd = buf[3]; H.flags = buf[2]; + H.version = buf[1]; // Set/change buffer endianness buf.be = H.flags&pva_flags::MSB; buf.skip(4u, __FILE__, __LINE__); diff --git a/src/pvxs/client.h b/src/pvxs/client.h index a996f2c..45c6484 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -940,10 +940,15 @@ public: DiscoverBuilder Context::discover(std::function && fn) { return DiscoverBuilder(pvt, std::move(fn)); } struct PVXS_API Config { - //! List of unicast and broadcast addresses + /** List of unicast, multicast, and broadcast addresses to which search requests will be sent. + * + * Entries may take the forms: + * - [:] + * - [:][,][@] + */ std::vector addressList; - //! List of interface addresses on which beacons may be received. + //! List of local interface addresses on which beacons may be received. //! Also constrains autoAddrList to only consider broadcast addresses of listed interfaces. //! Empty implies wildcard 0.0.0.0 std::vector interfaces; diff --git a/src/server.cpp b/src/server.cpp index 50f77d0..bfaec86 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -375,10 +375,12 @@ std::ostream& operator<<(std::ostream& strm, const Server& serv) } Server::Pvt::Pvt(const Config &conf) - :effective(conf) + :canIPv6(evsocket::canIPv6()) + ,effective(conf) ,beaconMsg(128) ,acceptor_loop("PVXTCP", epicsThreadPriorityCAServerLow-2) - ,beaconSender(AF_INET, SOCK_DGRAM, 0) + ,beaconSender4(AF_INET, SOCK_DGRAM, 0) + ,beaconSender6(AF_INET6, SOCK_DGRAM, 0) ,beaconTimer(event_new(acceptor_loop.base, -1, EV_TIMEOUT, doBeaconsS, this)) ,searchReply(0x10000) ,builtinsrc(StaticSource::build()) @@ -386,62 +388,74 @@ Server::Pvt::Pvt(const Config &conf) { effective.expand(); - { - int val = 1; - if(setsockopt(beaconSender.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val))) - log_err_printf(serversetup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO); - } + beaconSender4.set_broadcast(true); auto manager = UDPManager::instance(); evsocket dummy(AF_INET, SOCK_DGRAM, 0); - for(const auto& iface : effective.interfaces) { - SockAddr addr(AF_INET, iface.c_str()); - addr.setPort(effective.udp_port); + const auto cb(std::bind(&Pvt::onSearch, this, std::placeholders::_1)); - listeners.push_back(manager.onSearch(addr, - std::bind(&Pvt::onSearch, this, std::placeholders::_1) )); + std::vector tcpifaces; // may have port zero + for(const auto& iface : effective.interfaces) { + SockEndpoint addr(iface.c_str()); + if(!addr.addr.isMCast()) + tcpifaces.push_back(addr.addr); + + addr.addr.setPort(effective.udp_port); + + listeners.push_back(manager.onSearch(addr, cb)); // update to allow udp_port==0 - effective.udp_port = addr.port(); + effective.udp_port = addr.addr.port(); + + if(addr.addr.family()==AF_INET && addr.addr.isAny()) { + // if listening on 0.0.0.0, also listen on [::] + auto any6(addr); + any6.addr = SockAddr::any(AF_INET6); + + listeners.push_back(manager.onSearch(any6, cb)); + } #ifndef _WIN32 - if(!addr.isAny()) { + if(addr.addr.family()==AF_INET && !addr.addr.isAny() && !addr.addr.isMCast()) { /* An oddness of BSD sockets (not winsock) is that binding to * INADDR_ANY will receive unicast and broadcast, but binding to * a specific interface address receives only unicast. The trick * is to bind a second socket to the interface broadcast address, * which will then receive only broadcasts. */ - for(auto bcast : dummy.broadcasts(&addr)) { - bcast.setPort(addr.port()); - listeners.push_back(manager.onSearch(bcast, - std::bind(&Pvt::onSearch, this, std::placeholders::_1) )); + for(auto bcast : dummy.broadcasts(&addr.addr)) { + bcast.setPort(addr.addr.port()); + listeners.push_back(manager.onSearch(bcast, cb)); } } #endif } for(const auto& addr : effective.ignoreAddrs) { - SockAddr temp(AF_INET, addr.c_str()); + SockAddr temp(addr.c_str()); ignoreList.push_back(temp); } - acceptor_loop.call([this](){ + acceptor_loop.call([this, &tcpifaces](){ // from accepter worker bool firstiface = true; - for(const auto& addr : effective.interfaces) { - interfaces.emplace_back(addr, effective.tcp_port, this, firstiface); + for(auto& addr : tcpifaces) { + if(addr.port()==0) + addr.setPort(effective.tcp_port); + + interfaces.emplace_back(addr, this, firstiface); + if(firstiface || effective.tcp_port==0) effective.tcp_port = interfaces.back().bind_addr.port(); firstiface = false; } for(const auto& addr : effective.beaconDestinations) { - beaconDest.emplace_back(AF_INET, addr.c_str(), effective.udp_port); + beaconDest.emplace_back(addr.c_str(), effective.udp_port); } }); @@ -706,10 +720,13 @@ void Server::Pvt::doBeacons(short evt) assert(M.good() && H.good()); for(const auto& dest : beaconDest) { - int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest->sa, dest.size()); + auto& sender = dest.addr.family()==AF_INET ? beaconSender4 : beaconSender6; + sender.mcast_prep_sendto(dest); + + int ntx = sendto(sender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest.addr->sa, dest.addr.size()); if(ntx<0) { - int err = evutil_socket_geterror(beaconSender.sock); + int err = evutil_socket_geterror(sender.sock); auto lvl = Level::Warn; if(err==EINTR || err==EPERM) lvl = Level::Debug; @@ -721,7 +738,7 @@ void Server::Pvt::doBeacons(short evt) unsigned(ntx), unsigned(pktlen)); } else { - log_debug_printf(serverio, "Beacon tx to %s\n", dest.tostring().c_str()); + log_debug_printf(serverio, "Beacon tx to %s\n", std::string(SB()<server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS), - SockAddr(peer, socklen)) + SockAddr(peer)) ,iface(iface) { log_debug_printf(connio, "Client %s connects\n", peerName.c_str()); @@ -379,14 +379,21 @@ void ServerConn::bevWrite() } -ServIface::ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback) +ServIface::ServIface(const SockAddr &addr, server::Server::Pvt *server, bool fallback) :server(server) - ,bind_addr(AF_INET, addr.c_str(), port) - ,sock(AF_INET, SOCK_STREAM, 0) + ,bind_addr(addr) { server->acceptor_loop.assertInLoop(); auto orig_port = bind_addr.port(); + if(server->canIPv6 && bind_addr.family()==AF_INET && bind_addr.isAny()) { + // promote to IPv6 with IPv4 support + bind_addr = SockAddr::any(AF_INET6, bind_addr.port()); + log_debug_printf(connsetup, "Promote 0.0.0.0 -> [::]%s", "\n"); + } + + sock = evsocket(bind_addr.family(), SOCK_STREAM, 0); + if(evutil_make_listen_socket_reuseable(sock.sock)) log_warn_printf(connsetup, "Unable to make socket reusable%s", "\n"); @@ -396,9 +403,12 @@ ServIface::ServIface(const std::string& addr, unsigned short port, server::Serve sock.bind(bind_addr); } catch(std::system_error& e) { if(fallback && e.code().value()==SOCK_EADDRINUSE) { + log_debug_printf(connsetup, "Address %s in use", bind_addr.tostring().c_str()); bind_addr.setPort(0); + fallback = false; continue; } + log_err_printf(connsetup, "Bind to %s fails", bind_addr.tostring().c_str()); throw; } break; @@ -426,11 +436,6 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s { auto self = static_cast(raw); try { - if(peer->sa_family!=AF_INET) { - log_crit_printf(connsetup, "Interface %s Rejecting !ipv4 client\n", self->name.c_str()); - evutil_closesocket(sock); - return; - } auto conn(std::make_shared(self, sock, peer, socklen)); self->server->connections[conn.get()] = std::move(conn); }catch(std::exception& e){ diff --git a/src/serverconn.h b/src/serverconn.h index 2103cb9..0895792 100644 --- a/src/serverconn.h +++ b/src/serverconn.h @@ -169,7 +169,7 @@ struct ServIface evsocket sock; evlisten listener; - ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback); + ServIface(const SockAddr &addr, server::Server::Pvt *server, bool fallback); static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw); }; @@ -200,6 +200,7 @@ using namespace impl; struct Server::Pvt { SockAttach attach; + const bool canIPv6; std::weak_ptr internal_self; @@ -218,13 +219,13 @@ struct Server::Pvt evbase acceptor_loop; std::list > listeners; - std::vector beaconDest; + std::vector beaconDest; std::vector ignoreList; std::list interfaces; std::map > connections; - evsocket beaconSender; + evsocket beaconSender4, beaconSender6; evevent beaconTimer; std::vector searchReply; diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index a7faf67..e848461 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -36,9 +36,9 @@ struct UDPCollector : public UDPManager::Search, { UDPManager::Pvt* const manager; SockAddr bind_addr; // address our socket is bound to - SockAddr lo_mcast_addr; // destination endpoint for local mcast forwarding + SockEndpoint lo_mcast_addr; // destination endpoint for local mcast forwarding SockAddr lo_addr; - std::set> mcast_grps; // mcast group+iface pairs which our socket has joined + std::set mcast_grps; // mcast group+iface pairs which our socket has joined std::string name; evsocket sock; evevent rx; @@ -50,7 +50,7 @@ struct UDPCollector : public UDPManager::Search, std::set listeners; - UDPCollector(UDPManager::Pvt* manager, uint16_t port); + UDPCollector(UDPManager::Pvt* manager, int af, uint16_t port); ~UDPCollector(); void addListener(UDPListener *l); @@ -93,13 +93,15 @@ public: struct UDPManager::Pvt { evbase loop; - IfaceMap ifmap; + IfaceMap& ifmap; // only manipulate from loop worker thread - std::map collectors; + // key'd by address family and port# + std::map, UDPCollector*> collectors; Pvt() :loop("PVXUDP", epicsThreadPriorityCAServerLow-4) + ,ifmap(IfaceMap::instance()) {} ~Pvt() { @@ -107,12 +109,12 @@ struct UDPManager::Pvt { assert(collectors.empty()); } - std::shared_ptr collect(const SockAddr& dest) + std::shared_ptr collect(const SockEndpoint& dest) { std::shared_ptr collector; - if(dest.port()!=0) { - auto it = collectors.find(dest.port()); + if(dest.addr.port()!=0) { + auto it = collectors.find(std::make_pair(dest.addr.family(), dest.addr.port())); if(it!=collectors.end()) { try { collector = it->second->shared_from_this(); @@ -123,26 +125,26 @@ struct UDPManager::Pvt { } if(!collector) { - collector.reset(new UDPCollector(this, dest.port())); + collector.reset(new UDPCollector(this, dest.addr.family(), dest.addr.port())); } return collector; } }; -UDPCollector::UDPCollector(UDPManager::Pvt *manager, uint16_t requested_port) +UDPCollector::UDPCollector(UDPManager::Pvt *manager, int af, uint16_t requested_port) :manager(manager) - ,bind_addr(SockAddr::any(AF_INET, requested_port)) - ,lo_mcast_addr(bind_addr.family(), "224.0.0.128") + ,bind_addr(SockAddr::any(af, requested_port)) + ,lo_mcast_addr("224.0.0.128,1@127.0.0.1") ,lo_addr(SockAddr::loopback(bind_addr.family())) - ,sock(bind_addr.family(), SOCK_DGRAM, 0) + ,sock(af, SOCK_DGRAM, 0) ,rx(event_new(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this)) ,beaconMsg(src) { manager->loop.assertInLoop(); epicsSocketEnableAddressUseForDatagramFanout(sock.sock); - enable_SO_RXQ_OVFL(sock.sock); - enable_IP_PKTINFO(sock.sock); + sock.enable_SO_RXQ_OVFL(); + sock.enable_IP_PKTINFO(); /* Always bind to wildcard to receive all uni/broad/multicast, and also to send them. * Notes: @@ -158,31 +160,32 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, uint16_t requested_port) sock.bind(bind_addr); name = "UDP "+bind_addr.tostring(); - lo_mcast_addr.setPort(bind_addr.port()); - lo_addr.setPort(bind_addr.port()); + if(af==AF_INET) { + lo_mcast_addr.addr.setPort(bind_addr.port()); + lo_addr.setPort(bind_addr.port()); - // join local group to receive - sock.mcast_join(lo_mcast_addr, lo_addr); - // setup for re-transmit - sock.mcast_ttl(1); // make default explicit, we will only send to lo_mcast_addr. - sock.mcast_loop(true); - sock.mcast_iface(lo_addr); + // join local group to receive + auto Mem(lo_mcast_addr.resolve()); + sock.mcast_join(Mem); + // setup for re-transmit + sock.mcast_loop(true); - mcast_grps.emplace(lo_mcast_addr, lo_addr); + mcast_grps.emplace(Mem); + } log_info_printf(logsetup, "Bound %d to %s as lo\n", sock.sock, name.c_str()); if(event_add(rx.get(), nullptr)) throw std::runtime_error("Unable to create collector Rx event"); - manager->collectors[bind_addr.port()] = this; + manager->collectors[std::make_pair(af, bind_addr.port())] = this; } UDPCollector::~UDPCollector() { manager->loop.assertInLoop(); - manager->collectors.erase(bind_addr.port()); + manager->collectors.erase(std::make_pair(bind_addr.family(), bind_addr.port())); // we should only be destroyed after that last listener has removed itself assert(listeners.empty()); @@ -191,23 +194,24 @@ UDPCollector::~UDPCollector() void UDPCollector::addListener(UDPListener *l) { - for(const auto& mcast : l->mcasts) { - const auto tup(std::make_pair(mcast, l->dest)); - if(mcast_grps.find(tup)==mcast_grps.end()) { - mcast_grps.insert(tup); - - log_debug_printf(logsetup, "collector joining %s on %s\n", - mcast.tostring().c_str(), - l->dest.tostring().c_str()); - - sock.mcast_join(mcast, l->dest); + if(l->dest.addr.isMCast()) { + l->cur = l->dest.resolve(); + auto it(mcast_grps.find(l->cur)); + if(it==mcast_grps.end() && sock.mcast_join(l->cur)) { + mcast_grps.emplace(l->cur); + log_debug_printf(logsetup, "collector joining %s\n", + std::string(SB()<dest).c_str()); } } listeners.insert(l); + + log_debug_printf(logsetup, "Start listening for UDP %s\n", std::string(SB()<dest).c_str()); } void UDPCollector::delListener(UDPListener *l) { + log_debug_printf(logsetup, "Stop listening for UDP %s\n", std::string(SB()<dest).c_str()); + listeners.erase(l); // TODO: bother to cleanup mcast group membership? @@ -236,9 +240,7 @@ bool UDPCollector::handle_one() if(nrx<0) { int err = evutil_socket_geterror(sock.sock); - if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { - // nothing to do here - } else { + if(err!=SOCK_EWOULDBLOCK && err!=EAGAIN && err!=SOCK_EINTR) { log_warn_printf(logio, "UDP RX Error on %s : %s\n", name.c_str(), evutil_socket_error_to_string(err)); } @@ -246,6 +248,15 @@ bool UDPCollector::handle_one() } + if(dest.family()!=AF_UNSPEC) + dest.setPort(bind_addr.port()); + + if(src.isMCast()) { + // should never happen. It it does, we won't be tricked into amplifying a DDoS. + log_debug_printf(logio, "Ignoring UDP with mcast source %s.\n", src.tostring().c_str()); + return true; + } + log_hex_printf(logio, Level::Debug, rxbuf, nrx, "UDP %d Rx %d, %s -> %s @%u (%s)\n", sock.sock, nrx, src.tostring().c_str(), dest.tostring().c_str(), unsigned(rx.dstif), bind_addr.tostring().c_str()); @@ -280,8 +291,9 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t switch(head.cmd) { case CMD_SEARCH: { + peerVersion = head.version; + uint8_t flags = 0; - SockAddr replyAddr; uint16_t port = 0; from_wire(M, searchID); @@ -292,16 +304,16 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t M.skip(3, __FILE__, __LINE__); // unused/reserved auto save_replyAddr = M.save(); - from_wire(M, replyAddr); + from_wire(M, server); from_wire(M, port); - if(replyAddr.isAny()) { - replyAddr = src; + if(server.isAny()) { + server = src; if(origin==OriginTag) { log_err_printf(logio, "CMD_ORIGIN_TAG search with reply to sender never works%s", "\n"); return; } } - replyAddr.setPort(port); + server.setPort(port); if(M.good() && origin==Loopback && (flags&pva_search_flags::Unicast) && dest.family()!=AF_UNSPEC) { assert(buf==&this->buf[cmd_origin_tag_size]); @@ -310,7 +322,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t // recipient of forwarded message must use, and trust, replyAddr in body :( { FixedBuf R(M.be, save_replyAddr, 16u); - to_wire(R, replyAddr); + to_wire(R, server); assert(R.good()); } forwardM(dest, buf, nrx); @@ -318,18 +330,18 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t } // so far, only "tcp" transport has ever been seen. - // however, we will consider and ignore any others which might appear - bool foundtcp = false; + // however, we will pass through others which might appear + otherproto.clear(); Size nproto{0}; from_wire(M, nproto); for(size_t i=0; i=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p'; - M.skip(nchar.size, __FILE__, __LINE__); + std::string prot; + from_wire(M, prot); + if(prot=="tcp") { + protoTCP = true; + } else { + otherproto.push_back(prot); + } } // one Search message can include many PV names. @@ -348,14 +360,14 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t from_wire(M, chlen); // inject nil for previous PV name *mundge = '\0'; - if(foundtcp && chlen.size<=M.size() && M.good()) { + if(protoTCP && chlen.size<=M.size() && M.good()) { names.push_back(UDPManager::Search::Name{reinterpret_cast(M.save()), id}); } M.skip(chlen.size, __FILE__, __LINE__); } // used by our reply() - src = replyAddr; + src = server; if(M.good()) { // ensure nil for final PV name @@ -372,6 +384,8 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t } case CMD_BEACON: { + beaconMsg.peerVersion = head.version; + uint16_t port = 0; _from_wire<12>(M, &beaconMsg.guid[0], false, __FILE__, __LINE__); @@ -389,6 +403,8 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t if(M.good()) { for(auto L : listeners) { + if(L->dest.addr.compare(dest)!=0) + break; // TODO: check interface index against L->cur if(L->beaconCB) { (L->beaconCB)(beaconMsg); } @@ -406,7 +422,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t // only accept when sent to the mcast address from the loopback address // since we only join the mcast group on loopback this will hopefully // frustrate attempts to inject CMD_ORIGIN_TAG externally. - if(M.good() && origin==Loopback && dest.compare(lo_mcast_addr,false)==0 && src.isLO()) { + if(M.good() && origin==Loopback && dest.compare(lo_mcast_addr.addr,false)==0 && src.isLO()) { originaddr.setPort(bind_addr.port()); process_one(originaddr, M.save(), M.size(), OriginTag); @@ -417,7 +433,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t originaddr.tostring().c_str(), M.good() ? 'T' : 'F', origin==Loopback ? 'T' : 'F', - dest.compare(lo_mcast_addr,false)==0 ? 'T' : 'F', + dest.compare(lo_mcast_addr.addr,false)==0 ? 'T' : 'F', src.isLO() ? 'T' : 'F'); break; @@ -445,7 +461,8 @@ void UDPCollector::forwardM(const SockAddr& origin, const uint8_t *pbuf, size_t assert(M.save()==&buf[cmd_origin_tag_size]); } - src = lo_mcast_addr; + sock.mcast_prep_sendto(lo_mcast_addr); + src = lo_mcast_addr.addr; reply(&buf[0], cmd_origin_tag_size+plen); } @@ -518,7 +535,7 @@ void UDPManager::cleanup() udp_gbl = nullptr; } -std::unique_ptr UDPManager::onBeacon(SockAddr& dest, +std::unique_ptr UDPManager::onBeacon(SockEndpoint &dest, std::function&& cb) { if(!pvt) @@ -536,7 +553,16 @@ std::unique_ptr UDPManager::onBeacon(SockAddr& dest, return ret; } -std::unique_ptr UDPManager::onSearch(SockAddr& dest, +std::unique_ptr UDPManager::onBeacon(SockAddr& dest, + std::function&& cb) +{ + SockEndpoint ep(dest); + auto ret(onBeacon(ep, std::move(cb))); + dest = ep.addr; + return ret; +} + +std::unique_ptr UDPManager::onSearch(SockEndpoint &dest, std::function&& cb) { if(!pvt) @@ -554,6 +580,15 @@ std::unique_ptr UDPManager::onSearch(SockAddr& dest, return ret; } +std::unique_ptr UDPManager::onSearch(SockAddr& dest, + std::function&& cb) +{ + SockEndpoint ep(dest); + auto ret(onSearch(ep, std::move(cb))); + dest = ep.addr; + return ret; +} + void UDPManager::sync() { if(!pvt) @@ -562,15 +597,16 @@ void UDPManager::sync() pvt->loop.sync(); } -UDPListener::UDPListener(const std::shared_ptr &manager, SockAddr &dest) +UDPListener::UDPListener(const std::shared_ptr &manager, SockEndpoint &ep) :manager(manager) - ,dest(dest) + ,collector(manager->collect(ep)) + ,dest([&ep, this]() -> SockEndpoint{ + ep.addr.setPort(collector->bind_addr.port()); + return ep; + }()) ,active(false) { manager->loop.assertInLoop(); - - collector = manager->collect(dest); - dest.setPort(collector->bind_addr.port()); } UDPListener::~UDPListener() @@ -585,17 +621,6 @@ UDPListener::~UDPListener() }); } -void UDPListener::addMCast(const SockAddr& mcast) -{ - manager->loop.call([this, &mcast](){ - if(active) - throw std::logic_error("must addMCast() before start()"); - - collector->mcast_grps.emplace(mcast.withPort(collector->bind_addr.port()), - dest); - }); -} - void UDPListener::start(bool s) { manager->loop.call([this, s](){ diff --git a/src/udp_collector.h b/src/udp_collector.h index f830ca5..e3622df 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -39,17 +39,23 @@ struct PVXS_API UDPManager std::string proto; SockAddr server; ServerGUID guid; + uint8_t peerVersion; Beacon(const SockAddr& src) :src(src) {} }; //! Create subscription for Beacon messages. //! Must call UDPListener::start() + std::unique_ptr onBeacon(SockEndpoint& dest, + std::function&& cb); std::unique_ptr onBeacon(SockAddr& dest, std::function&& cb); struct PVXS_API Search { + std::vector otherproto; // any protocols other than "tcp" SockAddr src; SockAddr server; uint32_t searchID; + uint8_t peerVersion; + bool protoTCP; // included protocol "tcp" bool mustReply; struct Name { const char *name; @@ -66,6 +72,8 @@ struct PVXS_API UDPManager }; //! Create subscription for Search messages. //! Must call UDPListener::start() + std::unique_ptr onSearch(SockEndpoint& dest, + std::function&& cb); std::unique_ptr onSearch(SockAddr& dest, std::function&& cb); @@ -91,8 +99,8 @@ class PVXS_API UDPListener std::function beaconCB; const std::shared_ptr manager; std::shared_ptr collector; - const SockAddr dest; - std::set mcasts; + const SockEndpoint dest; + MCastMembership cur; bool active; INST_COUNTER(UDPListener); @@ -100,12 +108,10 @@ class PVXS_API UDPListener friend struct UDPCollector; friend struct UDPManager; - UDPListener(const std::shared_ptr& manager, SockAddr& dest); + UDPListener(const std::shared_ptr& manager, SockEndpoint& dest); public: ~UDPListener(); - void addMCast(const SockAddr& mcast); - void start(bool s=true); inline void stop() { start(false); } }; diff --git a/src/unittest.cpp b/src/unittest.cpp index f4d9415..901864f 100644 --- a/src/unittest.cpp +++ b/src/unittest.cpp @@ -50,6 +50,7 @@ void cleanup_for_valgrind() #endif impl::logger_shutdown(); impl::UDPManager::cleanup(); + IfaceMap::cleanup(); } testCase::testCase() diff --git a/src/util.cpp b/src/util.cpp index 48704da..9c12afc 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -258,8 +258,8 @@ SigInt::~SigInt() SockAddr::SockAddr(int af) + :store{} { - memset(&store, 0, sizeof(store)); store.sa.sa_family = af; if(af!=AF_INET #ifdef AF_INET6 @@ -269,21 +269,26 @@ SockAddr::SockAddr(int af) throw std::invalid_argument("Unsupported address family"); } -SockAddr::SockAddr(int af, const char *address, unsigned short port) - :SockAddr(af) +SockAddr::SockAddr(const char *address, unsigned short port) + :SockAddr(AF_UNSPEC) { setAddress(address, port); } -SockAddr::SockAddr(const sockaddr *addr, ev_socklen_t len) - :SockAddr(addr->sa_family) +SockAddr::SockAddr(const sockaddr *addr) + :SockAddr(addr ? addr->sa_family : AF_UNSPEC) { - if(len<0 || len>ev_socklen_t(size())) - throw std::invalid_argument("Truncated Address"); - memcpy(&store, addr, len); + if(!addr) + return; // treat NULL as AF_UNSPEC + + if(family()!=AF_UNSPEC && family()!=AF_INET && family()!=AF_INET6) + throw std::invalid_argument("Unsupported address family"); + + if(family()!=AF_UNSPEC) + memcpy(&store, addr, family()==AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); } -size_t SockAddr::size() const +size_t SockAddr::size() const noexcept { switch(store.sa.sa_family) { case AF_INET: return sizeof(store.in); @@ -295,7 +300,7 @@ size_t SockAddr::size() const } } -unsigned short SockAddr::port() const +unsigned short SockAddr::port() const noexcept { switch(store.sa.sa_family) { case AF_INET: return ntohs(store.in.sin_port); @@ -318,17 +323,97 @@ void SockAddr::setPort(unsigned short port) } } -void SockAddr::setAddress(const char *name, unsigned short port) +void SockAddr::setAddress(const char *name, unsigned short defport) { - SockAddr temp(AF_INET); - if(aToIPAddr(name, port, &temp->in)) - throw std::runtime_error(std::string("Unable to parse as IP address: ")+name); - if(temp.port()==0) - temp.setPort(port); + assert(name); + // too bad evutil_parse_sockaddr_port() treats ":0" as an error... + + /* looking for + * [ipv6]:port + * ipv6 + * [ipv6] + * ipv4:port + * ipv4 + */ + // TODO: could optimize to find all of these with a single loop + const char *firstc = strchr(name, ':'), + *lastc = strrchr(name, ':'), + *openb = strchr(name, '['), + *closeb = strrchr(name, ']'); + + if(!openb ^ !closeb) { + // '[' w/o ']' or vis. versa + throw std::runtime_error(SB()<<"IPv6 with mismatched brackets \""<sa.sa_family = AF_INET; + sockaddr = (void*)&temp->in.sin_addr.s_addr; + + } else if(firstc && firstc==lastc && !openb) { + // no bracket and only one ':' + // ipv4 w/ port + size_t addrlen = firstc-name; + if(addrlen >= sizeof(scratch)) + throw std::runtime_error(SB()<<"IPv4 address too long \""<sa.sa_family = AF_INET; + sockaddr = (void*)&temp->in.sin_addr.s_addr; + + } else if(firstc && firstc!=lastc && !openb) { + // no bracket and more than one ':' + // bare ipv6 + addr = name; + port = nullptr; + temp->sa.sa_family = AF_INET6; + sockaddr = (void*)&temp->in6.sin6_addr; + + } else if(openb) { + // brackets + // ipv6, maybe with port + size_t addrlen = closeb-openb-1u; + if(addrlen >= sizeof(scratch)) + throw std::runtime_error(SB()<<"IPv6 address too long \""< closeb) + port = lastc+1; + else + port = nullptr; + temp->sa.sa_family = AF_INET6; + sockaddr = (void*)&temp->in6.sin6_addr; + + } else { + throw std::runtime_error(SB()<<"Invalid IP address form \""<sa.sa_family, addr, sockaddr)<=0) + throw std::runtime_error(SB()<<"Not a valid IP address \""<(port)); + else + temp.setPort(defport); + (*this) = temp; } -bool SockAddr::isAny() const +bool SockAddr::isAny() const noexcept { switch(store.sa.sa_family) { case AF_INET: return store.in.sin_addr.s_addr==htonl(INADDR_ANY); @@ -339,7 +424,7 @@ bool SockAddr::isAny() const } } -bool SockAddr::isLO() const +bool SockAddr::isLO() const noexcept { switch(store.sa.sa_family) { case AF_INET: return store.in.sin_addr.s_addr==htonl(INADDR_LOOPBACK); @@ -350,10 +435,10 @@ bool SockAddr::isLO() const } } -bool SockAddr::isMCast() const +bool SockAddr::isMCast() const noexcept { switch(store.sa.sa_family) { - case AF_INET: return IN_MULTICAST(store.in.sin_addr.s_addr); + case AF_INET: return IN_MULTICAST(ntohl(store.in.sin_addr.s_addr)); #ifdef AF_INET6 case AF_INET6: return IN6_IS_ADDR_MULTICAST(&store.in6.sin6_addr); #endif @@ -361,6 +446,27 @@ bool SockAddr::isMCast() const } } +SockAddr SockAddr::map4to6() const +{ + SockAddr ret; + if(family()==AF_INET) { + static_assert (sizeof(ret->in6.sin6_addr)==16, ""); + ret->in6.sin6_family = AF_INET6; + ret->in6.sin6_addr.s6_addr[10] = 0xff; + ret->in6.sin6_addr.s6_addr[11] = 0xff; + memcpy(&ret->in6.sin6_addr.s6_addr[12], &store.in.sin_addr.s_addr, 4u); + + ret->in6.sin6_port = store.in.sin_port; + + } else if(family()==AF_INET6) { + ret = *this; + + } else { + throw std::logic_error("Invalid address family"); + } + return ret; +} + std::string SockAddr::tostring() const { std::ostringstream strm; @@ -428,12 +534,15 @@ std::ostream& operator<<(std::ostream& strm, const SockAddr& addr) char buf[INET6_ADDRSTRLEN+1]; if(evutil_inet_ntop(AF_INET6, &addr->in6.sin6_addr, buf, sizeof(buf))) { buf[sizeof(buf)-1] = '\0'; // paranoia + strm<<'['<"; } - strm<in6.sin6_port)) - strm<<':'<in6.sin6_port); + if(addr->in6.sin6_scope_id) + strm<<"%"<in6.sin6_scope_id; + if(auto port = ntohs(addr->in6.sin6_port)) + strm<<':'< diff --git a/test/testsock.cpp b/test/testsock.cpp index 853eef9..7764894 100644 --- a/test/testsock.cpp +++ b/test/testsock.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include @@ -36,35 +37,49 @@ void test_ifacemap() { testDiag("Enter %s", __func__); - impl::IfaceMap ifs; - ifs.refresh(); + auto& ifs = IfaceMap::instance(); - testFalse(ifs.info.empty())<<" found "< G(ifs.lock); // since we are playing around with the internals... + + ifs.refresh(true); + + testFalse(ifs.byIndex.empty())<<" found "<sa, mcast_addr.size()); + int ret = sendto(B.sock, (char*)msg, sizeof(msg), 0, &mcast_addr.addr->sa, mcast_addr.addr.size()); testEq(ret, (int)sizeof(msg))<<"Send test"; uint8_t rxbuf[8] = {}; @@ -134,7 +148,7 @@ void test_local_mcast() recvfromx rx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest}; ret = rx.call(); if(dest.family()==AF_INET) - dest.setPort(mcast_addr.port()); + dest.setPort(mcast_addr.addr.port()); testTrue(ret>=0 && rx.dstif>0 && ifinfo.has_address(rx.dstif, sender_addr)) <<" received on index "<sa, lo.size()); + auto ret = sendto(TX.sock, msg, msglen, 0, &mcast_addr.addr->sa, mcast_addr.addr.size()); testEq(ret, int(msglen))<<" sendto("< "<