IPv6+mcast support
This commit is contained in:
+77
-45
@@ -468,11 +468,15 @@ Value buildCAMethod()
|
||||
}
|
||||
|
||||
ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
:effective(conf)
|
||||
:canIPv6(evsocket::canIPv6())
|
||||
,ifmap(IfaceMap::instance())
|
||||
,effective(conf)
|
||||
,caMethod(buildCAMethod())
|
||||
,searchTx(AF_INET, SOCK_DGRAM, 0)
|
||||
,searchTx4(AF_INET, SOCK_DGRAM, 0)
|
||||
,searchTx6(AF_INET6, SOCK_DGRAM, 0)
|
||||
,tcp_loop(tcp_loop)
|
||||
,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
|
||||
,searchRx4(event_new(tcp_loop.base, searchTx4.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
|
||||
,searchRx6(event_new(tcp_loop.base, searchTx6.sock, EV_READ|EV_PERSIST, &ContextImpl::onSearchS, this))
|
||||
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &ContextImpl::tickSearchS, this))
|
||||
,manager(UDPManager::instance())
|
||||
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &ContextImpl::tickBeaconCleanS, this))
|
||||
@@ -484,52 +488,60 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
searchBuckets.resize(nBuckets);
|
||||
|
||||
std::set<SockAddr, SockAddrOnlyLess> bcasts;
|
||||
for(auto& addr : searchTx.broadcasts()) {
|
||||
for(auto& addr : searchTx4.broadcasts()) {
|
||||
addr.setPort(0u);
|
||||
bcasts.insert(addr);
|
||||
}
|
||||
|
||||
searchTx6.ipv6_only();
|
||||
|
||||
{
|
||||
osiSockAddr any{};
|
||||
any.ia.sin_family = AF_INET;
|
||||
if(bind(searchTx.sock, &any.sa, sizeof(any.ia)))
|
||||
auto any(SockAddr::any(searchTx4.af));
|
||||
if(bind(searchTx4.sock, &any->sa, any.size()))
|
||||
throw std::runtime_error("Unable to bind random UDP port");
|
||||
|
||||
socklen_t alen = sizeof(any);
|
||||
if(getsockname(searchTx.sock, &any.sa, &alen))
|
||||
socklen_t alen = any.capacity();
|
||||
if(getsockname(searchTx4.sock, &any->sa, &alen))
|
||||
throw std::runtime_error("Unable to readback random UDP port");
|
||||
|
||||
searchRxPort = ntohs(any.ia.sin_port);
|
||||
searchRxPort = any.port();
|
||||
|
||||
log_debug_printf(setup, "Using UDP Rx port %u\n", searchRxPort);
|
||||
}
|
||||
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(searchTx.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val)))
|
||||
log_err_printf(setup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO);
|
||||
auto any(SockAddr::any(searchTx6.af, searchRxPort));
|
||||
if(bind(searchTx6.sock, &any->sa, any.size()))
|
||||
throw std::runtime_error("Unable to bind random UDP6 port");
|
||||
}
|
||||
enable_SO_RXQ_OVFL(searchTx.sock);
|
||||
|
||||
searchTx4.set_broadcast(true);
|
||||
searchTx4.enable_SO_RXQ_OVFL();
|
||||
searchTx6.enable_SO_RXQ_OVFL();
|
||||
|
||||
for(auto& addr : effective.addressList) {
|
||||
SockAddr saddr(AF_INET);
|
||||
SockEndpoint ep;
|
||||
try {
|
||||
saddr.setAddress(addr.c_str(), effective.udp_port);
|
||||
}catch(std::runtime_error& e) {
|
||||
log_err_printf(setup, "%s Ignoring %s\n", e.what(), addr.c_str());
|
||||
ep = SockEndpoint(addr, effective.udp_port);
|
||||
}catch(std::exception& e){
|
||||
log_warn_printf(setup, "%s Ignoring malformed address %s\n", e.what(), addr.c_str());
|
||||
continue;
|
||||
}
|
||||
// if !bcast and !mcast
|
||||
auto isucast = bcasts.find(saddr)==bcasts.end() && !saddr.isMCast();
|
||||
assert(ep.addr.family()==AF_INET || ep.addr.family()==AF_INET6);
|
||||
|
||||
log_info_printf(io, "Searching to %s%s\n", saddr.tostring().c_str(), (isucast?" unicast":""));
|
||||
searchDest.emplace_back(saddr, isucast);
|
||||
// if !bcast and !mcast
|
||||
auto isucast = !ep.addr.isMCast();
|
||||
|
||||
if(isucast && ep.addr.family()==AF_INET && bcasts.find(ep.addr)!=bcasts.end())
|
||||
isucast = false;
|
||||
|
||||
log_info_printf(io, "Searching to %s%s\n", std::string(SB()<<ep).c_str(), (isucast?" unicast":""));
|
||||
searchDest.emplace_back(ep, isucast);
|
||||
}
|
||||
|
||||
for(auto& addr : effective.nameServers) {
|
||||
SockAddr saddr(AF_INET);
|
||||
SockAddr saddr;
|
||||
try {
|
||||
saddr.setAddress(addr.c_str(), 5075);
|
||||
saddr.setAddress(addr.c_str(), effective.tcp_port);
|
||||
}catch(std::runtime_error& e) {
|
||||
log_err_printf(setup, "%s Ignoring...\n", e.what());
|
||||
}
|
||||
@@ -538,12 +550,22 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
nameServers.emplace_back(saddr, nullptr);
|
||||
}
|
||||
|
||||
const auto cb([this](const UDPManager::Beacon& msg) {
|
||||
onBeacon(msg);
|
||||
});
|
||||
|
||||
for(auto& iface : effective.interfaces) {
|
||||
SockAddr addr(AF_INET, iface.c_str(), effective.udp_port);
|
||||
log_info_printf(io, "Listening for beacons on %s\n", addr.tostring().c_str());
|
||||
beaconRx.push_back(manager.onBeacon(addr, [this](const UDPManager::Beacon& msg) {
|
||||
onBeacon(msg);
|
||||
}));
|
||||
SockEndpoint addr(iface.c_str(), effective.udp_port);
|
||||
beaconRx.push_back(manager.onBeacon(addr, cb));
|
||||
log_info_printf(io, "Listening for beacons on %s\n", addr.addr.tostring().c_str());
|
||||
|
||||
if(addr.addr.family()==AF_INET && addr.addr.isAny()) {
|
||||
// if listening on 0.0.0.0, also listen on [::]
|
||||
auto any6(addr);
|
||||
any6.addr = SockAddr::any(AF_INET6);
|
||||
|
||||
beaconRx.push_back(manager.onBeacon(any6, cb));
|
||||
}
|
||||
}
|
||||
|
||||
for(auto& listener : beaconRx) {
|
||||
@@ -552,8 +574,10 @@ ContextImpl::ContextImpl(const Config& conf, const evbase& tcp_loop)
|
||||
|
||||
if(event_add(searchTimer.get(), &bucketInterval))
|
||||
log_err_printf(setup, "Error enabling search timer\n%s", "");
|
||||
if(event_add(searchRx.get(), nullptr))
|
||||
log_err_printf(setup, "Error enabling search RX\n%s", "");
|
||||
if(event_add(searchRx4.get(), nullptr))
|
||||
log_err_printf(setup, "Error enabling search RX4\n%s", "");
|
||||
if(event_add(searchRx6.get(), nullptr))
|
||||
log_err_printf(setup, "Error enabling search RX6\n%s", "");
|
||||
if(event_add(beaconCleaner.get(), &beaconCleanInterval))
|
||||
log_err_printf(setup, "Error enabling beacon clean timer on\n%s", "");
|
||||
if(event_add(cacheCleaner.get(), &channelCacheCleanInterval))
|
||||
@@ -586,7 +610,8 @@ void ContextImpl::close()
|
||||
// terminate all active connections
|
||||
tcp_loop.call([this]() {
|
||||
(void)event_del(searchTimer.get());
|
||||
(void)event_del(searchRx.get());
|
||||
(void)event_del(searchRx4.get());
|
||||
(void)event_del(searchRx6.get());
|
||||
(void)event_del(beaconCleaner.get());
|
||||
(void)event_del(cacheCleaner.get());
|
||||
|
||||
@@ -670,7 +695,7 @@ static
|
||||
void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool istcp)
|
||||
{
|
||||
ServerGUID guid;
|
||||
SockAddr serv(AF_INET);
|
||||
SockAddr serv;
|
||||
uint16_t port = 0;
|
||||
uint8_t found = 0u;
|
||||
uint32_t seq = 0u;
|
||||
@@ -762,12 +787,12 @@ void procSearchReply(ContextImpl& self, const SockAddr& src, Buffer& M, bool ist
|
||||
|
||||
}
|
||||
|
||||
bool ContextImpl::onSearch()
|
||||
bool ContextImpl::onSearch(evutil_socket_t fd)
|
||||
{
|
||||
searchMsg.resize(0x10000);
|
||||
SockAddr src;
|
||||
|
||||
recvfromx rx{searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src};
|
||||
recvfromx rx{fd, (char*)&searchMsg[0], searchMsg.size()-1, &src};
|
||||
const int nrx = rx.call();
|
||||
|
||||
if(nrx>=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) {
|
||||
@@ -776,7 +801,7 @@ bool ContextImpl::onSearch()
|
||||
}
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(searchTx.sock);
|
||||
int err = evutil_socket_geterror(fd);
|
||||
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
||||
// nothing to do here
|
||||
} else {
|
||||
@@ -844,7 +869,7 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
|
||||
// limit number of packets processed before going back to the reactor
|
||||
unsigned i;
|
||||
const unsigned limit = 40;
|
||||
for(i=0; i<limit && static_cast<ContextImpl*>(raw)->onSearch(); i++) {}
|
||||
for(i=0; i<limit && static_cast<ContextImpl*>(raw)->onSearch(fd); i++) {}
|
||||
log_debug_printf(io, "UDP search processed %u/%u\n", i, limit);
|
||||
|
||||
}catch(std::exception& e){
|
||||
@@ -976,20 +1001,27 @@ void ContextImpl::tickSearch(bool discover)
|
||||
to_wire(H, Header{CMD_SEARCH, 0, uint32_t(consumed-8u)});
|
||||
}
|
||||
for(auto& pair : searchDest) {
|
||||
if(pair.second)
|
||||
auto& dest = pair.first.addr.family()==AF_INET ? searchTx4 : searchTx6;
|
||||
|
||||
if(pair.second) {
|
||||
*pflags |= pva_search_flags::Unicast;
|
||||
else
|
||||
|
||||
} else {
|
||||
*pflags &= ~pva_search_flags::Unicast;
|
||||
|
||||
int ntx = sendto(searchTx.sock, (char*)searchMsg.data(), consumed, 0, &pair.first->sa, pair.first.size());
|
||||
dest.mcast_prep_sendto(pair.first);
|
||||
}
|
||||
|
||||
int ntx = sendto(dest.sock, (char*)searchMsg.data(), consumed, 0,
|
||||
&pair.first.addr->sa, pair.first.addr.size());
|
||||
|
||||
if(ntx<0) {
|
||||
int err = evutil_socket_geterror(searchTx.sock);
|
||||
int err = evutil_socket_geterror(dest.sock);
|
||||
auto lvl = Level::Warn;
|
||||
if(err==EINTR || err==EPERM)
|
||||
lvl = Level::Debug;
|
||||
log_printf(io, lvl, "Search tx error (%d) %s\n",
|
||||
err, evutil_socket_error_to_string(err));
|
||||
log_printf(io, lvl, "Search tx %s error (%d) %s\n",
|
||||
pair.first.addr.tostring().c_str(), err, evutil_socket_error_to_string(err));
|
||||
|
||||
} else if(unsigned(ntx)<consumed) {
|
||||
log_warn_printf(io, "Search truncated %u < %u",
|
||||
@@ -998,7 +1030,7 @@ void ContextImpl::tickSearch(bool discover)
|
||||
} else {
|
||||
log_hex_printf(io, Level::Debug, (char*)searchMsg.data(), consumed,
|
||||
"Search to %s %s\n",
|
||||
pair.first.tostring().c_str(),
|
||||
std::string(SB()<<pair.first).c_str(),
|
||||
pair.second ? "ucast" : "bcast");
|
||||
}
|
||||
}
|
||||
|
||||
+6
-4
@@ -226,6 +226,8 @@ private:
|
||||
struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
{
|
||||
SockAttach attach;
|
||||
const bool canIPv6;
|
||||
IfaceMap& ifmap;
|
||||
|
||||
// "const" after ctor
|
||||
Config effective;
|
||||
@@ -235,7 +237,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
uint32_t nextCID=0x12345678;
|
||||
uint32_t prevndrop = 0u;
|
||||
|
||||
evsocket searchTx;
|
||||
evsocket searchTx4, searchTx6;
|
||||
uint16_t searchRxPort;
|
||||
|
||||
std::vector<ServerGUID> ignoreServerGUIDs;
|
||||
@@ -256,7 +258,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
std::vector<uint8_t> searchMsg;
|
||||
|
||||
// search destination address and whether to set the unicast flag
|
||||
std::vector<std::pair<SockAddr, bool>> searchDest;
|
||||
std::vector<std::pair<SockEndpoint, bool>> searchDest;
|
||||
|
||||
size_t currentBucket = 0u;
|
||||
std::vector<std::list<std::weak_ptr<Channel>>> searchBuckets;
|
||||
@@ -274,7 +276,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
std::vector<std::pair<SockAddr, std::shared_ptr<Connection>>> nameServers;
|
||||
|
||||
evbase tcp_loop;
|
||||
const evevent searchRx;
|
||||
const evevent searchRx4, searchRx6;
|
||||
const evevent searchTimer;
|
||||
|
||||
// beacon handling done on UDP worker.
|
||||
@@ -302,7 +304,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
|
||||
void onBeacon(const UDPManager::Beacon& msg);
|
||||
|
||||
bool onSearch();
|
||||
bool onSearch(evutil_socket_t fd);
|
||||
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
void tickSearch(bool discover);
|
||||
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
|
||||
+237
-40
@@ -20,6 +20,7 @@
|
||||
#include <pvxs/log.h>
|
||||
#include "serverconn.h"
|
||||
#include "clientimpl.h"
|
||||
#include "utilpvt.h"
|
||||
#include "evhelper.h"
|
||||
|
||||
DEFINE_LOGGER(serversetup, "pvxs.server.setup");
|
||||
@@ -28,6 +29,116 @@ DEFINE_LOGGER(config, "pvxs.config");
|
||||
|
||||
namespace pvxs {
|
||||
|
||||
SockEndpoint::SockEndpoint(const char* ep, uint16_t defport)
|
||||
{
|
||||
// <IP46>
|
||||
// <IP46>,<ttl#>
|
||||
// <IP46>@ifacename
|
||||
// <IP46>,<ttl#>@ifacename
|
||||
auto comma = strchr(ep, ',');
|
||||
auto at = strchr(ep, '@');
|
||||
|
||||
if(comma && at && comma > at) {
|
||||
throw std::runtime_error(SB()<<'"'<<escape(ep)<<"\" comma expected before @");
|
||||
}
|
||||
|
||||
if(!comma && !at) {
|
||||
addr.setAddress(ep, defport);
|
||||
|
||||
} else { // comma || at
|
||||
auto firstsep = comma ? comma : at;
|
||||
addr.setAddress(std::string(ep, firstsep-ep), defport);
|
||||
|
||||
if(comma && !at) {
|
||||
ttl = parseTo<int64_t>(comma+1);
|
||||
|
||||
} else if(comma) {
|
||||
ttl = parseTo<int64_t>(std::string(comma+1, at-comma-1));
|
||||
}
|
||||
|
||||
if(at)
|
||||
iface = at+1;
|
||||
}
|
||||
|
||||
auto& ifmap = IfaceMap::instance();
|
||||
|
||||
if(addr.family()==AF_INET6) {
|
||||
if(iface.empty() && addr->in6.sin6_scope_id) {
|
||||
// interface index provide with IPv6 address
|
||||
// we map back to symbolic name for storage
|
||||
iface = ifmap.name_of(addr->in6.sin6_scope_id);
|
||||
}
|
||||
addr->in6.sin6_scope_id = 0;
|
||||
|
||||
} else if(addr.family()==AF_INET && addr.isMCast() && !iface.empty()) {
|
||||
SockAddr ifaddr(AF_INET);
|
||||
|
||||
if(evutil_inet_pton(AF_INET, iface.c_str(), &ifaddr->in.sin_addr.s_addr)==1) {
|
||||
// map interface address to symbolic name
|
||||
|
||||
iface = ifmap.name_of(ifaddr);
|
||||
}
|
||||
}
|
||||
|
||||
if(!iface.empty() && !ifmap.index_of(iface)) {
|
||||
log_warn_printf(config, "Invalid interface address or name: \"%s\"\n", iface.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
MCastMembership SockEndpoint::resolve() const
|
||||
{
|
||||
if(!addr.isMCast())
|
||||
throw std::logic_error("not mcast");
|
||||
|
||||
auto& ifmap = IfaceMap::instance();
|
||||
|
||||
MCastMembership m;
|
||||
m.af = addr.family();
|
||||
if(m.af==AF_INET) {
|
||||
auto& req = m.req.in;
|
||||
req.imr_multiaddr.s_addr = addr->in.sin_addr.s_addr;
|
||||
|
||||
if(!iface.empty()) {
|
||||
auto iface = ifmap.address_of(this->iface);
|
||||
if(iface.family()==AF_INET) {
|
||||
req.imr_interface.s_addr = iface->in.sin_addr.s_addr;
|
||||
}
|
||||
}
|
||||
|
||||
} else if(m.af==AF_INET6) {
|
||||
auto& req = m.req.in6;
|
||||
req.ipv6mr_multiaddr = addr->in6.sin6_addr;
|
||||
|
||||
if(!iface.empty()) {
|
||||
req.ipv6mr_interface = ifmap.index_of(this->iface);
|
||||
if(!req.ipv6mr_interface) {
|
||||
log_warn_printf(config, "Unable to resolve interface '%s'\n", iface.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
throw std::logic_error("Unsupported address family");
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& strm, const SockEndpoint& addr)
|
||||
{
|
||||
strm<<addr.addr;
|
||||
if(addr.addr.isMCast()) {
|
||||
if(addr.ttl)
|
||||
strm<<','<<addr.ttl;
|
||||
if(!addr.iface.empty())
|
||||
strm<<'@'<<addr.iface;
|
||||
}
|
||||
return strm;
|
||||
}
|
||||
|
||||
bool operator==(const SockEndpoint& lhs, const SockEndpoint& rhs)
|
||||
{
|
||||
return lhs.addr==rhs.addr && lhs.ttl==rhs.ttl && lhs.iface==rhs.iface;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
/* Historically pvAccessCPP used $EPICS_PVA_CONN_TMO as the period
|
||||
@@ -50,21 +161,18 @@ void split_addr_into(const char* name, std::vector<std::string>& out, const std:
|
||||
pos = end;
|
||||
|
||||
if(start<end) {
|
||||
auto temp = inp.substr(start, end==std::string::npos ? end : end-start);
|
||||
auto temp(inp.substr(start, end==std::string::npos ? end : end-start));
|
||||
try {
|
||||
SockEndpoint ep(temp);
|
||||
if(ep.addr.port()==0)
|
||||
ep.addr.setPort(defaultPort);
|
||||
out.push_back(SB()<<ep);
|
||||
|
||||
sockaddr_in addr = {};
|
||||
if(aToIPAddr(temp.c_str(), defaultPort, &addr)) {
|
||||
} catch(std::exception& e){
|
||||
if(required)
|
||||
throw std::runtime_error(SB()<<"invalid IP or non-existent hostname \""<<temp<<"\"");
|
||||
log_err_printf(config, "%s ignoring invalid '%s'\n", name, temp.c_str());
|
||||
continue;
|
||||
throw std::runtime_error(SB()<<"invalid endpoint \""<<temp<<"\" "<<e.what());
|
||||
log_err_printf(config, "%s ignoring invalid '%s' : %s\n", name, temp.c_str(), e.what());
|
||||
}
|
||||
std::ostringstream strm;
|
||||
uint32_t ip = ntohl(addr.sin_addr.s_addr);
|
||||
strm<<((ip>>24)&0xff)<<'.'<<((ip>>16)&0xff)<<'.'<<((ip>>8)&0xff)<<'.'<<((ip>>0)&0xff);
|
||||
if(addr.sin_port)
|
||||
strm<<':'<<ntohs(addr.sin_port);
|
||||
out.emplace_back(strm.str());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,43 +254,113 @@ struct PickOne {
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<SockEndpoint> parseAddresses(const std::vector<std::string>& addrs, uint16_t defport=0)
|
||||
{
|
||||
std::vector<SockEndpoint> ret;
|
||||
for(const auto& addr : addrs) {
|
||||
try {
|
||||
ret.emplace_back(addr, defport);
|
||||
}catch(std::runtime_error& e){
|
||||
log_warn_printf(config, "Ignoring %s : %s\n", addr.c_str(), e.what());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void printAddresses(std::vector<std::string>& out, std::vector<SockEndpoint>& inp)
|
||||
{
|
||||
std::vector<std::string> temp;
|
||||
temp.reserve(inp.size());
|
||||
|
||||
for(auto& addr : inp) {
|
||||
temp.emplace_back(SB()<<addr);
|
||||
}
|
||||
out = std::move(temp);
|
||||
}
|
||||
|
||||
// Fill out address list by appending broadcast addresses
|
||||
// of any and all local interface addresses already included
|
||||
void expandAddrList(const std::vector<std::string>& ifaces,
|
||||
std::vector<std::string>& addrs)
|
||||
void expandAddrList(const std::vector<SockEndpoint>& ifaces,
|
||||
std::vector<SockEndpoint>& addrs)
|
||||
{
|
||||
SockAttach attach;
|
||||
evsocket dummy(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
std::vector<std::string> bcasts;
|
||||
|
||||
for(auto& addr : ifaces) {
|
||||
SockAddr saddr(AF_INET);
|
||||
try {
|
||||
saddr.setAddress(addr.c_str());
|
||||
}catch(std::runtime_error& e){
|
||||
log_warn_printf(config, "%s Ignoring...\n", e.what());
|
||||
for(auto& saddr : ifaces) {
|
||||
if(saddr.addr.family()!=AF_INET)
|
||||
continue;
|
||||
}
|
||||
|
||||
for(auto& addr : dummy.broadcasts(&saddr)) {
|
||||
for(auto& addr : dummy.broadcasts(&saddr.addr)) {
|
||||
addr.setPort(0u);
|
||||
bcasts.push_back(addr.tostring());
|
||||
addrs.emplace_back(addr);
|
||||
}
|
||||
}
|
||||
|
||||
addrs.reserve(addrs.size()+bcasts.size());
|
||||
for(auto& bcast : bcasts) {
|
||||
addrs.push_back(std::move(bcast));
|
||||
}
|
||||
}
|
||||
|
||||
void removeDups(std::vector<std::string>& addrs)
|
||||
void addGroups(std::vector<SockEndpoint>& ifaces,
|
||||
const std::vector<SockEndpoint>& addrs)
|
||||
{
|
||||
auto& ifmap = IfaceMap::instance();
|
||||
std::set<std::string> allifaces;
|
||||
|
||||
for(const auto& addr : addrs) {
|
||||
if(!addr.addr.isMCast())
|
||||
continue;
|
||||
|
||||
if(!addr.iface.empty()) {
|
||||
// interface already specified
|
||||
ifaces.push_back(addr);
|
||||
|
||||
} else {
|
||||
// no interface specified, treat as wildcard
|
||||
if(allifaces.empty())
|
||||
allifaces = ifmap.all_external();
|
||||
|
||||
for(auto& iface : allifaces) {
|
||||
auto ifaceaddr(addr);
|
||||
ifaceaddr.iface = iface;
|
||||
ifaces.push_back(ifaceaddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// remove duplicates while preserving order of first appearance
|
||||
template<typename A>
|
||||
void removeDups(std::vector<A>& addrs)
|
||||
{
|
||||
std::sort(addrs.begin(), addrs.end());
|
||||
addrs.erase(std::unique(addrs.begin(), addrs.end()),
|
||||
addrs.end());
|
||||
}
|
||||
|
||||
// special handling for SockEndpoint where duplication is based on
|
||||
// address,interface. Duplicates are combined with the longest TTL.
|
||||
template<>
|
||||
void removeDups(std::vector<SockEndpoint>& addrs)
|
||||
{
|
||||
std::map<std::pair<SockAddr, std::string>, size_t> seen;
|
||||
for(size_t i=0; i<addrs.size(); ) {
|
||||
auto& ep = addrs[i];
|
||||
auto key = std::make_pair(ep.addr, ep.iface);
|
||||
auto it = seen.find(key);
|
||||
if(it==seen.end()) { // first sighting
|
||||
seen[key] = i++;
|
||||
|
||||
} else { // duplicate
|
||||
auto& orig = addrs[it->second];
|
||||
|
||||
if(ep.ttl > orig.ttl) { // w/ longer TTL
|
||||
orig.ttl = ep.ttl;
|
||||
}
|
||||
|
||||
addrs.erase(addrs.begin()+i);
|
||||
// 'ep' and 'orig' are invalidated
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void enforceTimeout(double& tmo)
|
||||
{
|
||||
/* Inactivity timeouts with PVA have a long (and growing) history.
|
||||
@@ -287,22 +465,35 @@ void Config::updateDefs(defs_t& defs) const
|
||||
|
||||
void Config::expand()
|
||||
{
|
||||
if(tcp_port==0)
|
||||
tcp_port = 5075;
|
||||
|
||||
auto ifaces(parseAddresses(interfaces));
|
||||
auto bdest(parseAddresses(beaconDestinations));
|
||||
|
||||
// empty interface address list implies the wildcard
|
||||
// (because no addresses isn't interesting...)
|
||||
if(interfaces.empty()) {
|
||||
interfaces.emplace_back("0.0.0.0");
|
||||
if(ifaces.empty()) {
|
||||
ifaces.emplace_back(SockAddr::any(AF_INET));
|
||||
}
|
||||
|
||||
if(auto_beacon) {
|
||||
expandAddrList(interfaces, beaconDestinations);
|
||||
// use interface list add ipv4 broadcast addresses to beaconDestinations.
|
||||
// 0.0.0.0 -> adds all bcasts
|
||||
// otherwise add bcast for each iface address
|
||||
expandAddrList(ifaces, bdest);
|
||||
addGroups(ifaces, bdest);
|
||||
auto_beacon = false;
|
||||
}
|
||||
|
||||
removeDups(interfaces);
|
||||
removeDups(beaconDestinations);
|
||||
removeDups(ifaces);
|
||||
printAddresses(interfaces, ifaces);
|
||||
removeDups(bdest);
|
||||
printAddresses(beaconDestinations, bdest);
|
||||
removeDups(ignoreAddrs);
|
||||
|
||||
enforceTimeout(tcpTimeout);
|
||||
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& strm, const Config& conf)
|
||||
@@ -419,15 +610,21 @@ void Config::expand()
|
||||
if(tcp_port==0)
|
||||
tcp_port = 5075;
|
||||
|
||||
if(interfaces.empty())
|
||||
interfaces.emplace_back("0.0.0.0");
|
||||
auto ifaces(parseAddresses(interfaces));
|
||||
auto addrs(parseAddresses(addressList));
|
||||
|
||||
if(ifaces.empty())
|
||||
ifaces.emplace_back(SockAddr::any(AF_INET));
|
||||
|
||||
if(autoAddrList) {
|
||||
expandAddrList(interfaces, addressList);
|
||||
expandAddrList(ifaces, addrs);
|
||||
addGroups(ifaces, addrs);
|
||||
autoAddrList = false;
|
||||
}
|
||||
|
||||
removeDups(addressList);
|
||||
printAddresses(interfaces, ifaces);
|
||||
removeDups(addrs);
|
||||
printAddresses(addressList, addrs);
|
||||
|
||||
enforceTimeout(tcpTimeout);
|
||||
}
|
||||
|
||||
+1
-1
@@ -103,7 +103,7 @@ std::ostream& target_information(std::ostream& strm)
|
||||
#endif
|
||||
|
||||
auto localaddr(osiLocalAddr(dummy.sock));
|
||||
strm<<indent{}<<"osiLocalAddr() -> "<<SockAddr(&localaddr.sa, sizeof(localaddr)).tostring()<<"\n";
|
||||
strm<<indent{}<<"osiLocalAddr() -> "<<SockAddr(&localaddr.sa).tostring()<<"\n";
|
||||
|
||||
strm<<indent{}<<"osiSockDiscoverBroadcastAddresses() ->\n";
|
||||
Indented J(strm);
|
||||
|
||||
+319
-69
@@ -44,6 +44,7 @@ namespace pvxs {namespace impl {
|
||||
DEFINE_LOGGER(logerr, "pvxs.loop");
|
||||
DEFINE_LOGGER(logtimer, "pvxs.timer");
|
||||
DEFINE_LOGGER(logiface, "pvxs.iface");
|
||||
DEFINE_LOGGER(logsock, "pvxs.sock");
|
||||
|
||||
namespace mdetail {
|
||||
VFunctor0::~VFunctor0() {}
|
||||
@@ -364,18 +365,22 @@ bool evbase::assertInRunningLoop() const
|
||||
throw std::logic_error("Not in running evbase worker");
|
||||
}
|
||||
|
||||
evsocket::evsocket(evutil_socket_t sock)
|
||||
evsocket::evsocket(int af, evutil_socket_t sock)
|
||||
:sock(sock)
|
||||
,af(af)
|
||||
{
|
||||
if(sock==evutil_socket_t(-1)) {
|
||||
int err = SOCKERRNO;
|
||||
int err = evutil_socket_geterror(sock);
|
||||
#ifdef _WIN32
|
||||
if(err==WSANOTINITIALISED) {
|
||||
throw std::runtime_error("WSANOTINITIALISED");
|
||||
}
|
||||
#endif
|
||||
(void)err;
|
||||
throw std::runtime_error("Unable to allocate socket");
|
||||
throw std::system_error(err, std::system_category());
|
||||
}
|
||||
if(af!=AF_INET && af!=AF_INET6) {
|
||||
evutil_closesocket(sock);
|
||||
throw std::logic_error("Unsupported address family");
|
||||
}
|
||||
|
||||
evutil_make_socket_closeonexec(sock);
|
||||
@@ -392,7 +397,7 @@ evsocket::evsocket(evutil_socket_t sock)
|
||||
#endif
|
||||
|
||||
evsocket::evsocket(int af, int type, int proto)
|
||||
:evsocket(socket(af, type | SOCK_CLOEXEC, proto))
|
||||
:evsocket(af, socket(af, type | SOCK_CLOEXEC, proto))
|
||||
{
|
||||
#ifdef __linux__
|
||||
# ifndef IP_MULTICAST_ALL
|
||||
@@ -411,8 +416,10 @@ evsocket::evsocket(int af, int type, int proto)
|
||||
|
||||
evsocket::evsocket(evsocket&& o) noexcept
|
||||
:sock(o.sock)
|
||||
,af(o.af)
|
||||
{
|
||||
o.sock = evutil_socket_t(-1);
|
||||
o.af = AF_UNSPEC;
|
||||
}
|
||||
|
||||
evsocket& evsocket::operator=(evsocket&& o) noexcept
|
||||
@@ -421,7 +428,9 @@ evsocket& evsocket::operator=(evsocket&& o) noexcept
|
||||
if(sock!=evutil_socket_t(-1))
|
||||
evutil_closesocket(sock);
|
||||
sock = o.sock;
|
||||
af = o.af;
|
||||
o.sock = evutil_socket_t(-1);
|
||||
o.af = AF_UNSPEC;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
@@ -446,56 +455,124 @@ void evsocket::bind(SockAddr& addr) const
|
||||
log_err_printf(logerr, "Unable to fetch address of newly bound socket\n%s", "");
|
||||
}
|
||||
|
||||
void evsocket::mcast_join(const SockAddr& grp, const SockAddr& iface) const
|
||||
void evsocket::set_broadcast(bool b) const
|
||||
{
|
||||
if(grp.family()!=iface.family() || grp.family()!=AF_INET)
|
||||
throw std::invalid_argument("Unsupported address family");
|
||||
|
||||
ip_mreq req{};
|
||||
req.imr_multiaddr.s_addr = grp->in.sin_addr.s_addr;
|
||||
req.imr_interface.s_addr = iface->in.sin_addr.s_addr;
|
||||
|
||||
int ret = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&req, sizeof(req));
|
||||
if(ret)
|
||||
log_err_printf(logerr, "Unable to join mcast group %s on %s : %s\n",
|
||||
grp.tostring().c_str(), iface.tostring().c_str(),
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
// IPV6_ADD_MEMBERSHIP
|
||||
int val = b;
|
||||
if(setsockopt(sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val)))
|
||||
log_err_printf(logerr, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO);
|
||||
}
|
||||
|
||||
void evsocket::mcast_ttl(unsigned ttl) const
|
||||
{
|
||||
int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ttl, sizeof(ttl));
|
||||
if(ret)
|
||||
log_err_printf(logerr, "Unable to set mcast TTL : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
#ifndef IPV6_ADD_MEMBERSHIP
|
||||
# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
|
||||
#endif
|
||||
#ifndef IPV6_DROP_MEMBERSHIP
|
||||
# define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
|
||||
#endif
|
||||
|
||||
// ipv6 variant?
|
||||
bool evsocket::mcast_join(const MCastMembership& m) const
|
||||
{
|
||||
if(m.af==AF_INET) {
|
||||
auto& req = m.req.in;
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*)&req, sizeof(req))) {
|
||||
log_err_printf(logerr, "Unable to join mcast4 group: %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
return false;
|
||||
}
|
||||
|
||||
} else if(m.af==AF_INET6) {
|
||||
auto& req = m.req.in6;
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, (char*)&req, sizeof(req))) {
|
||||
log_err_printf(logerr, "Unable to join mcast6 group : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void evsocket::mcast_leave(const MCastMembership &m) const
|
||||
{
|
||||
if(m.af==AF_INET) {
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP, (char*)&m.req.in, sizeof(m.req.in)))
|
||||
log_err_printf(logerr, "Unable to leave mcast4 group: %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
} else if(m.af==AF_INET6) {
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_DROP_MEMBERSHIP, (char*)&m.req.in6, sizeof(m.req.in6)))
|
||||
log_err_printf(logerr, "Unable to leave mcast6 group: %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
}
|
||||
}
|
||||
|
||||
void evsocket::mcast_prep_sendto(const SockEndpoint& ep) const
|
||||
{
|
||||
if(ep.addr.family()!=af)
|
||||
throw std::logic_error("Inconsistent address family or not mcast");
|
||||
|
||||
else if(!ep.addr.isMCast())
|
||||
return;
|
||||
|
||||
auto& ifmap = IfaceMap::instance();
|
||||
|
||||
if(af==AF_INET) {
|
||||
SockAddr iface(AF_INET);
|
||||
if(!ep.iface.empty())
|
||||
iface = ifmap.address_of(ep.iface);
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, (char*)&ep.ttl, sizeof(ep.ttl)))
|
||||
log_err_printf(logerr, "Unable to set mcast TTL : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&iface->in.sin_addr, sizeof(iface->in.sin_addr)))
|
||||
log_err_printf(logerr, "Unable to set mcast IF : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
} else if(af==AF_INET6) {
|
||||
int index = 0u;
|
||||
if(!ep.iface.empty())
|
||||
index = ifmap.index_of(ep.iface);
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (char*)&ep.ttl, sizeof(ep.ttl)))
|
||||
log_err_printf(logerr, "Unable to set mcast TTL : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, (char*)&index, sizeof(index)))
|
||||
log_err_printf(logerr, "Unable to set mcast IF : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
}
|
||||
}
|
||||
|
||||
void evsocket::mcast_loop(bool loop) const
|
||||
{
|
||||
unsigned char val = loop ? 1 : 0;
|
||||
int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char*)&val, sizeof(val));
|
||||
if(ret)
|
||||
log_err_printf(logerr, "Unable to set mcast loopback : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
/* On Linux (at least) IP_MULTICAST_LOOP is not exactly equivalent to IPV6_MULTICAST_LOOP,
|
||||
* and we are allowed to set both. So we do...
|
||||
*/
|
||||
if(af==AF_INET || af==AF_INET6) {
|
||||
unsigned char val = loop ? 1 : 0;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char*)&val, sizeof(val)))
|
||||
log_err_printf(logerr, "Unable to set mcast loopback4 : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
// IPV6_MULTICAST_LOOP
|
||||
}
|
||||
if(af==AF_INET6) {
|
||||
unsigned int val = loop ? 1 : 0;
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, (char*)&val, sizeof(val)))
|
||||
log_err_printf(logerr, "Unable to set mcast loopback6 : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
}
|
||||
}
|
||||
|
||||
void evsocket::mcast_iface(const SockAddr& iface) const
|
||||
void evsocket::ipv6_only(bool b) const
|
||||
{
|
||||
if(iface.family()!=AF_INET)
|
||||
if(af!=AF_INET6)
|
||||
throw std::invalid_argument("Unsupported address family");
|
||||
|
||||
int ret = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&iface->in.sin_addr, sizeof(iface->in.sin_addr));
|
||||
if(ret)
|
||||
log_err_printf(logerr, "Unable to set mcast TTL : %s\n",
|
||||
int v=b;
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&v, sizeof(v)))
|
||||
log_err_printf(logerr, "Unable to set IPv6 only : %s\n",
|
||||
evutil_socket_error_to_string(evutil_socket_geterror(sock)));
|
||||
|
||||
// IPV6_MULTICAST_IF
|
||||
}
|
||||
|
||||
std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
|
||||
@@ -503,6 +580,11 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
|
||||
if(match && match->family()!=AF_INET) {
|
||||
throw std::logic_error("osiSockDiscoverBroadcastAddresses() only understands AF_INET");
|
||||
}
|
||||
|
||||
std::vector<SockAddr> ret;
|
||||
if(af==AF_INET6)
|
||||
return ret; // IPv6 does not have broadcast addresses
|
||||
|
||||
evsocket dummy(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
osiSockAddr realmatch;
|
||||
@@ -517,7 +599,6 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
|
||||
ELLLIST bcasts = ELLLIST_INIT;
|
||||
osiSockDiscoverBroadcastAddresses(&bcasts, dummy.sock, &realmatch);
|
||||
|
||||
std::vector<SockAddr> ret;
|
||||
ret.reserve(ellCount(&bcasts));
|
||||
|
||||
while(ellCount(&bcasts)) {
|
||||
@@ -525,55 +606,224 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
|
||||
ellDelete(&bcasts, cur);
|
||||
osiSockAddrNode *node = CONTAINER(cur, osiSockAddrNode, node);
|
||||
if(node->addr.sa.sa_family==AF_INET)
|
||||
ret.emplace_back(&node->addr.sa, sizeof(node->addr));
|
||||
ret.emplace_back(&node->addr.sa);
|
||||
free(node);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#if defined(_WIN32) && !defined(EAFNOSUPPORT)
|
||||
# define EAFNOSUPPORT WSAESOCKTNOSUPPORT
|
||||
#endif
|
||||
|
||||
bool evsocket::canIPv6()
|
||||
{
|
||||
auto sock = socket(AF_INET6, SOCK_DGRAM, 0);
|
||||
if(sock!=evutil_socket_t(-1)) {
|
||||
evutil_closesocket(sock);
|
||||
return true;
|
||||
}
|
||||
int err = evutil_socket_geterror(sock);
|
||||
if(err!=EAFNOSUPPORT) {
|
||||
log_warn_printf(logsock, "Unexpected errno %d while probing IPv6: %s\n",
|
||||
err, evutil_socket_error_to_string(err));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
#if EPICS_VERSION_INT<VERSION_INT(7,0,3,1)
|
||||
# define getMonotonic getCurrent
|
||||
#endif
|
||||
|
||||
static epicsThreadOnceId mapOnce = EPICS_THREAD_ONCE_INIT;
|
||||
|
||||
static IfaceMap* theinstance;
|
||||
|
||||
static
|
||||
void mapInit(void*)
|
||||
{
|
||||
theinstance = new IfaceMap();
|
||||
}
|
||||
|
||||
IfaceMap& IfaceMap::instance()
|
||||
{
|
||||
threadOnce(&mapOnce, &mapInit);
|
||||
assert(theinstance);
|
||||
return *theinstance;
|
||||
}
|
||||
|
||||
void IfaceMap::cleanup()
|
||||
{
|
||||
delete theinstance;
|
||||
theinstance = nullptr;
|
||||
}
|
||||
|
||||
IfaceMap::IfaceMap()
|
||||
{
|
||||
refresh();
|
||||
updated = epicsTime::getMonotonic();
|
||||
}
|
||||
|
||||
bool IfaceMap::has_address(int64_t ifindex, const SockAddr &addr)
|
||||
void IfaceMap::refresh(bool force)
|
||||
{
|
||||
auto now(epicsTime::getMonotonic());
|
||||
auto age = now-updated;
|
||||
double threshold = force ? 10.0 : 600.0; // TODO: configurable?
|
||||
if(age<threshold && !byIndex.empty())
|
||||
return;
|
||||
log_debug_printf(logiface, "refresh%s after %.1f sec\n", force?" forced":"", age);
|
||||
auto temp = _refresh();
|
||||
// cross-index
|
||||
decltype (byName) tempN;
|
||||
decltype (byAddr) tempA;
|
||||
for(auto& pair : temp) {
|
||||
auto& iface = pair.second;
|
||||
tempN[iface.name] = &iface;
|
||||
for(auto& pair : iface.addrs) {
|
||||
tempA.emplace(pair.first, std::make_pair(&iface, false));
|
||||
if(pair.second.family()==AF_INET)
|
||||
tempA.emplace(pair.second, std::make_pair(&iface, true));
|
||||
}
|
||||
}
|
||||
byIndex.swap(temp);
|
||||
byName.swap(tempN);
|
||||
byAddr.swap(tempA);
|
||||
updated = now;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
template<typename FN>
|
||||
bool try_cache(IfaceMap& self, FN&& fn)
|
||||
{
|
||||
bool force = false;
|
||||
retry:
|
||||
self.refresh(force);
|
||||
bool found = fn();
|
||||
if(!found && !force) {
|
||||
force = true;
|
||||
goto retry;
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool IfaceMap::has_address(uint64_t ifindex, const SockAddr &addr)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
if(addr.isAny())
|
||||
return true;
|
||||
|
||||
auto now(epicsTime::getMonotonic());
|
||||
auto age = now-updated;
|
||||
bool first = true;
|
||||
|
||||
retry:
|
||||
if(!first || age > 60) {
|
||||
refresh();
|
||||
updated = now;
|
||||
} else {
|
||||
log_debug_printf(logiface, "using cache age %.2f sec\n", age);
|
||||
}
|
||||
|
||||
auto ifit(info.find(ifindex));
|
||||
if(ifit!=info.end()) {
|
||||
const auto& iface = ifit->second;
|
||||
auto adit(iface.find(addr));
|
||||
return adit!=iface.end();
|
||||
}
|
||||
if(first) {
|
||||
// re-try once
|
||||
first = false;
|
||||
goto retry;
|
||||
}
|
||||
log_warn_printf(logiface, "Encountered unknown interface index %lld\n", (long long)ifindex);
|
||||
return false;
|
||||
bool found = try_cache(*this, [this, ifindex, &addr]() {
|
||||
auto ifit(byIndex.find(ifindex));
|
||||
if(ifit!=byIndex.end()) {
|
||||
const auto& addrs = ifit->second.addrs;
|
||||
return addrs.find(addr)!=addrs.end();
|
||||
}
|
||||
return false;
|
||||
});
|
||||
return found;
|
||||
}
|
||||
|
||||
std::string IfaceMap::name_of(uint64_t index)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
std::string name;
|
||||
bool found = try_cache(*this, [this, index, &name](){
|
||||
auto it(byIndex.find(index));
|
||||
if(it!=byIndex.end()) {
|
||||
name = it->second.name;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
if(!found) {
|
||||
// fallback to numeric index
|
||||
name = SB()<<index;
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
std::string IfaceMap::name_of(const SockAddr& addr)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
std::string name;
|
||||
try_cache(*this, [this, addr, &name](){
|
||||
auto it(byAddr.find(addr));
|
||||
if(it!=byAddr.end()) {
|
||||
name = it->second.first->name;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
return name;
|
||||
}
|
||||
|
||||
uint64_t IfaceMap::index_of(const std::string& name)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
uint64_t ret = 0u;
|
||||
try_cache(*this, [&ret, this, name]() {
|
||||
auto it = byName.find(name);
|
||||
bool hit = it!=byName.end();
|
||||
if(hit)
|
||||
ret = it->second->index;
|
||||
return hit;
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool IfaceMap::is_address(const SockAddr& addr)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
return try_cache(*this, [this, addr]() {
|
||||
return byAddr.find(addr)!=byAddr.end();
|
||||
});
|
||||
}
|
||||
|
||||
bool IfaceMap::is_broadcast(const SockAddr& addr)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
return try_cache(*this, [this, addr]() {
|
||||
auto it(byAddr.find(addr));
|
||||
return it!=byAddr.end() && it->second.second;
|
||||
});
|
||||
}
|
||||
|
||||
SockAddr IfaceMap::address_of(const std::string& name)
|
||||
{
|
||||
Guard G(lock);
|
||||
|
||||
SockAddr ret;
|
||||
try_cache(*this, [this, name, &ret]() {
|
||||
auto it(byName.find(name));
|
||||
if(it!=byName.end() && !it->second->addrs.empty()) {
|
||||
ret = it->second->addrs.begin()->first;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::set<std::string> IfaceMap::all_external()
|
||||
{
|
||||
std::set<std::string> ret;
|
||||
Guard G(lock);
|
||||
refresh();
|
||||
for(auto& pair : byIndex) {
|
||||
ret.emplace(pair.second.name);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
void to_wire(Buffer& buf, const SockAddr& val)
|
||||
{
|
||||
if(!buf.ensure(16)) {
|
||||
|
||||
+64
-16
@@ -193,12 +193,13 @@ void from_wire(Buffer &buf, SockAddr& val);
|
||||
struct PVXS_API evsocket
|
||||
{
|
||||
evutil_socket_t sock;
|
||||
int af;
|
||||
|
||||
// default construct an invalid socket
|
||||
constexpr evsocket() noexcept :sock(-1) {}
|
||||
constexpr evsocket() noexcept :sock(-1), af(AF_UNSPEC) {}
|
||||
|
||||
// construct from a valid (not -1) socket
|
||||
explicit evsocket(evutil_socket_t sock);
|
||||
explicit evsocket(int af, evutil_socket_t sock);
|
||||
|
||||
// create a new socket
|
||||
evsocket(int, int, int);
|
||||
@@ -217,33 +218,80 @@ struct PVXS_API evsocket
|
||||
inline operator bool() const { return sock!=-1; }
|
||||
|
||||
void bind(SockAddr& addr) const;
|
||||
//! join mcast group. Receive mcasts send to this group which arrive on the given interface
|
||||
//! @see IP_ADD_MEMBERSHIP
|
||||
void mcast_join(const SockAddr& grp, const SockAddr& iface) const;
|
||||
//! Set time-to-live out mcasts sent from this socket
|
||||
//! @see IP_MULTICAST_TTL
|
||||
void mcast_ttl(unsigned ttl) const;
|
||||
|
||||
void set_broadcast(bool b) const;
|
||||
|
||||
//! Join multicast group, optionally on selected interface
|
||||
bool mcast_join(const MCastMembership& m) const;
|
||||
//! Reverse previous join
|
||||
void mcast_leave(const MCastMembership& m) const;
|
||||
//! Prepare socket for subsequent sendto() with TTL and output interface
|
||||
void mcast_prep_sendto(const SockEndpoint& ep) const;
|
||||
|
||||
//! Whether mcasts sent from this socket should be received to local listeners
|
||||
//! @see IP_MULTICAST_LOOP
|
||||
//! @see IP_MULTICAST_LOOP and IPV6_MULTICAST_LOOP
|
||||
void mcast_loop(bool loop) const;
|
||||
//! Selects interface to use when sending mcasts
|
||||
//! @see IP_MULTICAST_IF
|
||||
void mcast_iface(const SockAddr& iface) const;
|
||||
//! Disable IPv4 through IPv6 socket
|
||||
void ipv6_only(bool b=true) const;
|
||||
|
||||
//! Linux specific include OS dropped packet counter as cmsg
|
||||
void enable_SO_RXQ_OVFL() const;
|
||||
|
||||
void enable_IP_PKTINFO() const;
|
||||
|
||||
//! wraps osiSockDiscoverBroadcastAddresses()
|
||||
std::vector<SockAddr> broadcasts(const SockAddr* match=nullptr) const;
|
||||
|
||||
static
|
||||
bool canIPv6();
|
||||
};
|
||||
|
||||
struct PVXS_API IfaceMap {
|
||||
static
|
||||
IfaceMap& instance();
|
||||
static
|
||||
void cleanup();
|
||||
|
||||
IfaceMap();
|
||||
|
||||
// return true if ifindex is valid, and addr is one of the addresses currently assigned to it.
|
||||
bool has_address(int64_t ifindex, const SockAddr& addr);
|
||||
// return true if ifindex is valid, and addr an interface address assigned to it.
|
||||
bool has_address(uint64_t ifindex, const SockAddr& addr);
|
||||
// lookup interface name by index
|
||||
std::string name_of(uint64_t index);
|
||||
// find (an) interface name with this address. useful for IPv4. returns empty string if not found.
|
||||
std::string name_of(const SockAddr& addr);
|
||||
// returns 0 if not found
|
||||
uint64_t index_of(const std::string& name);
|
||||
// is this a valid interface or broadcast address?
|
||||
bool is_address(const SockAddr& addr);
|
||||
// is this a valid interface or broadcast address?
|
||||
bool is_broadcast(const SockAddr& addr);
|
||||
// look up interface address. useful for IPV4. returns AF_UNSPEC if not found
|
||||
SockAddr address_of(const std::string& name);
|
||||
// all interface names except LO
|
||||
std::set<std::string> all_external();
|
||||
|
||||
void refresh();
|
||||
// caller must hold lock
|
||||
void refresh(bool force=false);
|
||||
|
||||
std::map<int64_t, std::set<SockAddr, SockAddrOnlyLess>> info;
|
||||
struct Iface {
|
||||
std::string name;
|
||||
uint64_t index;
|
||||
bool isLO;
|
||||
// interface address(s) -> (maybe) broadcast addr
|
||||
std::map<SockAddr, SockAddr, SockAddrOnlyLess> addrs;
|
||||
Iface(const std::string& name, uint64_t index, bool isLO) :name(name), index(index), isLO(isLO) {}
|
||||
};
|
||||
|
||||
epicsMutex lock;
|
||||
std::map<uint64_t, Iface> byIndex;
|
||||
std::map<std::string, Iface*> byName;
|
||||
// map address to tuple of interface and broadcast?
|
||||
std::multimap<SockAddr, std::pair<Iface*, bool>, SockAddrOnlyLess> byAddr;
|
||||
epicsTime updated;
|
||||
private:
|
||||
static
|
||||
decltype (byIndex) _refresh();
|
||||
};
|
||||
|
||||
} // namespace impl
|
||||
|
||||
+85
-18
@@ -54,13 +54,20 @@ void osiSockAttachExt()
|
||||
epicsThreadOnce(&oseOnce, &oseDoOnce, nullptr);
|
||||
}
|
||||
|
||||
void enable_SO_RXQ_OVFL(SOCKET sock) {}
|
||||
void evsocket::enable_SO_RXQ_OVFL() const {}
|
||||
|
||||
void enable_IP_PKTINFO(SOCKET sock)
|
||||
void evsocket::enable_IP_PKTINFO() const
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
|
||||
if(af==AF_INET) {
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
|
||||
|
||||
} else if(af==AF_INET6) {
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_PKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IPV6_RECVPKTINFO: %d\n", SOCKERRNO);
|
||||
}
|
||||
}
|
||||
|
||||
int recvfromx::call()
|
||||
@@ -77,7 +84,8 @@ int recvfromx::call()
|
||||
msg.name = &(*src)->sa;
|
||||
msg.namelen = src->size();
|
||||
|
||||
alignas (alignof (WSACMSGHDR)) char cbuf[WSA_CMSG_SPACE(sizeof(in_pktinfo))];
|
||||
// will receive either in6_pktinfo or in_pktinfo, not both. in6_pktinfo is larger
|
||||
alignas (WSACMSGHDR) char cbuf[WSA_CMSG_SPACE(sizeof(in6_pktinfo))];
|
||||
msg.Control = {sizeof(cbuf), cbuf};
|
||||
|
||||
DWORD nrx=0u;
|
||||
@@ -96,6 +104,16 @@ int recvfromx::call()
|
||||
memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx));
|
||||
dstif = idx;
|
||||
}
|
||||
else if(hdr->cmsg_level==IPPROTO_IPV6 && hdr->cmsg_type==IPV6_PKTINFO && hdr->cmsg_len>=WSA_CMSG_LEN(sizeof(in6_pktinfo))) {
|
||||
if(dst) {
|
||||
(*dst)->in6.sin6_family = AF_INET6;
|
||||
memcpy(&(*dst)->in6.sin6_addr, WSA_CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_addr), sizeof(in6_addr));
|
||||
}
|
||||
|
||||
decltype(in6_pktinfo::ipi6_ifindex) idx;
|
||||
memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_ifindex), sizeof(idx));
|
||||
dstif = idx;
|
||||
}
|
||||
}
|
||||
|
||||
return nrx;
|
||||
@@ -111,12 +129,16 @@ namespace impl {
|
||||
# define GAA_FLAG_INCLUDE_ALL_INTERFACES 0
|
||||
#endif
|
||||
|
||||
void IfaceMap::refresh() {
|
||||
decltype (IfaceMap::byIndex) IfaceMap::_refresh() {
|
||||
std::vector<char> ifaces(1024u);
|
||||
decltype (info) temp;
|
||||
decltype (byIndex) temp;
|
||||
|
||||
{
|
||||
constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST|GAA_FLAG_SKIP_MULTICAST|GAA_FLAG_SKIP_DNS_SERVER|GAA_FLAG_INCLUDE_ALL_INTERFACES;
|
||||
constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST
|
||||
|GAA_FLAG_SKIP_MULTICAST
|
||||
|GAA_FLAG_SKIP_DNS_SERVER
|
||||
|GAA_FLAG_INCLUDE_PREFIX
|
||||
|GAA_FLAG_INCLUDE_ALL_INTERFACES;
|
||||
|
||||
ULONG buflen = ifaces.size();
|
||||
auto err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast<IP_ADAPTER_ADDRESSES*>(ifaces.data()), &buflen);
|
||||
@@ -130,28 +152,73 @@ void IfaceMap::refresh() {
|
||||
|
||||
if(err) {
|
||||
log_warn_printf(logiface, "Unable to GetAdaptersAddresses() error=%lld\n", (unsigned long long)err);
|
||||
return;
|
||||
return temp;
|
||||
}
|
||||
}
|
||||
|
||||
for(auto iface = reinterpret_cast<const IP_ADAPTER_ADDRESSES*>(ifaces.data()); iface ; iface = iface->Next) {
|
||||
auto& info = temp[iface->IfIndex];
|
||||
|
||||
//TODO: any flags to check?
|
||||
if(!(iface->OperStatus & IfOperStatusUp))
|
||||
continue; // not configured, skip...
|
||||
|
||||
for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) {
|
||||
// TODO: IfIndex vs. Ipv6IfIndex which one to use?
|
||||
|
||||
if(addr->Address.lpSockaddr->sa_family!=AF_INET)
|
||||
bool isLO = iface->IfType==IF_TYPE_SOFTWARE_LOOPBACK;
|
||||
auto pair = temp.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(iface->IfIndex),
|
||||
std::forward_as_tuple(iface->AdapterName, iface->IfIndex, isLO));
|
||||
|
||||
auto& info = pair.first->second;
|
||||
|
||||
// find the IPv4 broadcast address, if any
|
||||
std::set<std::pair<SockAddr, SockAddr>> prefixes;
|
||||
for(auto prefix = iface->FirstPrefix; prefix; prefix = prefix->Next) {
|
||||
SockAddr addr(prefix->Address.lpSockaddr);
|
||||
auto p = prefix->PrefixLength;
|
||||
|
||||
if(addr.family()!=AF_INET || p<=0u || p>=32u)
|
||||
continue;
|
||||
|
||||
auto pair = info.emplace(addr->Address.lpSockaddr, sizeof(sockaddr_in));
|
||||
sockaddr_in mask{AF_INET};
|
||||
mask.sin_addr.s_addr = htonl(0xffffffff<<(32u-p));
|
||||
auto pair = prefixes.emplace(addr, (sockaddr*)&mask);
|
||||
|
||||
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n",
|
||||
(long long)iface->IfIndex, iface->AdapterName, pair.first->tostring().c_str());
|
||||
log_debug_printf(logiface, "Prefix %s/%s\n", addr.tostring().c_str(), pair.first->second.tostring().c_str());
|
||||
}
|
||||
|
||||
for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) {
|
||||
const auto af = addr->Address.lpSockaddr->sa_family;
|
||||
if(af!=AF_INET && af!=AF_INET6) {
|
||||
log_debug_printf(logiface, "Ignoring interface '%s' address family=%d\n",
|
||||
iface->AdapterName, af);
|
||||
continue;
|
||||
}
|
||||
|
||||
SockAddr iaddr(addr->Address.lpSockaddr);
|
||||
SockAddr bcast;
|
||||
if(iaddr.family()==AF_INET && !isLO) {
|
||||
auto A = ntohl(iaddr->in.sin_addr.s_addr);
|
||||
for(auto& pair : prefixes) {
|
||||
auto P = ntohl(pair.first->in.sin_addr.s_addr);
|
||||
auto M = ntohl(pair.second->in.sin_addr.s_addr);
|
||||
if((A&M)==P) {
|
||||
auto B = P | ~M;
|
||||
bcast->in.sin_family = AF_INET;
|
||||
bcast->in.sin_addr.s_addr = htonl(B);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto pair = info.addrs.emplace(iaddr, bcast);
|
||||
|
||||
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s/%s\n",
|
||||
(long long)iface->IfIndex, iface->AdapterName,
|
||||
pair.first->first.tostring().c_str(),
|
||||
pair.first->second.tostring().c_str());
|
||||
}
|
||||
}
|
||||
|
||||
info.swap(temp);
|
||||
return temp;
|
||||
}
|
||||
|
||||
} // namespace impl
|
||||
|
||||
@@ -4,6 +4,11 @@
|
||||
* in file LICENSE that is included with this distribution.
|
||||
*/
|
||||
|
||||
#ifdef __APPLE__
|
||||
// expose IPV6_PKTINFO
|
||||
# define __APPLE_USE_RFC_3542
|
||||
#endif
|
||||
|
||||
#include "osiSockExt.h"
|
||||
|
||||
#include <string.h>
|
||||
@@ -19,6 +24,11 @@ extern "C" {
|
||||
}
|
||||
#endif
|
||||
|
||||
// some *BSD (OSX but not RTEMS5/libbsd) use IPV6_PKTINFO to enable RX
|
||||
#if defined(IPV6_PKTINFO) && !defined(IPV6_RECVPKTINFO)
|
||||
# define IPV6_RECVPKTINFO IPV6_PKTINFO
|
||||
#endif
|
||||
|
||||
#include <pvxs/log.h>
|
||||
#include <evhelper.h>
|
||||
|
||||
@@ -29,7 +39,7 @@ DEFINE_LOGGER(logiface, "pvxs.iface");
|
||||
|
||||
void osiSockAttachExt() {}
|
||||
|
||||
void enable_SO_RXQ_OVFL(SOCKET sock)
|
||||
void evsocket::enable_SO_RXQ_OVFL() const
|
||||
{
|
||||
#ifdef SO_RXQ_OVFL
|
||||
// Linux specific feature exposes OS dropped packet count
|
||||
@@ -40,34 +50,42 @@ void enable_SO_RXQ_OVFL(SOCKET sock)
|
||||
#endif
|
||||
}
|
||||
|
||||
void enable_IP_PKTINFO(SOCKET sock)
|
||||
void evsocket::enable_IP_PKTINFO() const
|
||||
{
|
||||
/* linux, some *BSD's (OSX), and winsock package both destination address (from ip header)
|
||||
if(af==AF_INET) {
|
||||
|
||||
/* linux, some *BSD's (OSX), and winsock package both destination address (from ip header)
|
||||
* and receiving interface index (from host) into one IP_PKTINFO control message.
|
||||
* Remaining *BSD's can deliver these in separate IP_ORIGDSTADDR and IP_RECVIF messages.
|
||||
*/
|
||||
#ifdef IP_PKTINFO
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
|
||||
|
||||
#else
|
||||
# ifdef IP_ORIGDSTADDR
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO);
|
||||
}
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO);
|
||||
}
|
||||
|
||||
# endif
|
||||
# ifdef IP_RECVIF
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO);
|
||||
}
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO);
|
||||
}
|
||||
# endif
|
||||
#endif
|
||||
|
||||
} else if(af==AF_INET6) {
|
||||
int val = 1;
|
||||
if(setsockopt(sock, IPPROTO_IPV6, IPV6_RECVPKTINFO, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set IPV6_PKTINFO reception: %d\n", SOCKERRNO);
|
||||
}
|
||||
}
|
||||
|
||||
int recvfromx::call()
|
||||
@@ -81,10 +99,12 @@ int recvfromx::call()
|
||||
msg.msg_name = &(*src)->sa;
|
||||
msg.msg_namelen = src ? src->size() : 0u;
|
||||
|
||||
alignas (alignof (cmsghdr)) char cbuf[0u
|
||||
alignas (cmsghdr) char cbuf[0u
|
||||
#ifdef SO_RXQ_OVFL
|
||||
+ CMSG_SPACE(sizeof(ndrop))
|
||||
#endif
|
||||
// only need space for IPv4 option(s) or IPv6 option, never both.
|
||||
+ impl::cmax(0
|
||||
#ifdef IP_PKTINFO
|
||||
+ CMSG_SPACE(sizeof(in_pktinfo))
|
||||
#else
|
||||
@@ -95,6 +115,9 @@ int recvfromx::call()
|
||||
+ CMSG_SPACE(sizeof(sockaddr_dl))
|
||||
# endif
|
||||
#endif
|
||||
,0
|
||||
+ CMSG_SPACE(sizeof(in6_pktinfo))
|
||||
) // cmax
|
||||
];
|
||||
msg.msg_control = cbuf;
|
||||
msg.msg_controllen = sizeof(cbuf);
|
||||
@@ -143,6 +166,16 @@ int recvfromx::call()
|
||||
}
|
||||
# endif
|
||||
#endif
|
||||
else if(hdr->cmsg_level==IPPROTO_IPV6 && hdr->cmsg_type==IPV6_PKTINFO && hdr->cmsg_len>=CMSG_LEN(sizeof(in6_pktinfo))) {
|
||||
if(dst) {
|
||||
(*dst)->in6.sin6_family = AF_INET6;
|
||||
memcpy(&(*dst)->in6.sin6_addr, CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_addr), sizeof(in6_addr));
|
||||
}
|
||||
|
||||
decltype(in6_pktinfo::ipi6_ifindex) idx;
|
||||
memcpy(&idx, CMSG_DATA(hdr) + offsetof(in6_pktinfo, ipi6_ifindex), sizeof(idx));
|
||||
dstif = idx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,20 +184,22 @@ int recvfromx::call()
|
||||
|
||||
namespace impl {
|
||||
|
||||
void IfaceMap::refresh() {
|
||||
decltype (IfaceMap::byIndex) IfaceMap::_refresh() {
|
||||
ifaddrs* addrs = nullptr;
|
||||
|
||||
decltype (info) temp;
|
||||
decltype (byIndex) temp;
|
||||
|
||||
if(getifaddrs(&addrs)) {
|
||||
log_warn_printf(logiface, "Unable to getifaddrs() errno=%d\n", errno);
|
||||
return;
|
||||
return temp;
|
||||
}
|
||||
|
||||
try {
|
||||
for(const ifaddrs* ifa = addrs; ifa; ifa = ifa->ifa_next) {
|
||||
if(ifa->ifa_addr->sa_family!=AF_INET) {
|
||||
log_debug_printf(logiface, "Ignoring interface '%s' address !ipv4\n", ifa->ifa_name);
|
||||
const auto af = ifa->ifa_addr->sa_family;
|
||||
if((af!=AF_INET && af!=AF_INET6) || ifa->ifa_name[0]=='\0') {
|
||||
log_debug_printf(logiface, "Ignoring interface '%s' address family=%d\n",
|
||||
ifa->ifa_name, af);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -174,12 +209,28 @@ void IfaceMap::refresh() {
|
||||
continue;
|
||||
}
|
||||
|
||||
//TODO: any flags to check?
|
||||
if(!(ifa->ifa_flags&IFF_UP))
|
||||
continue; // not configured, skip...
|
||||
|
||||
auto pair = temp[idx].emplace(ifa->ifa_addr, sizeof(sockaddr_in));
|
||||
auto it = temp.find(idx);
|
||||
if(it==temp.end()) {
|
||||
// encountering new index
|
||||
bool isLO = ifa->ifa_flags&IFF_LOOPBACK;
|
||||
auto pair = temp.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(idx),
|
||||
std::forward_as_tuple(ifa->ifa_name, idx, isLO));
|
||||
assert(pair.second);
|
||||
it = pair.first;
|
||||
}
|
||||
|
||||
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n",
|
||||
(long long)idx, ifa->ifa_name, pair.first->tostring().c_str());
|
||||
// IFF_BROADCAST does not apply to IPv6
|
||||
bool hasB = ifa->ifa_addr->sa_family==AF_INET && (ifa->ifa_flags&IFF_BROADCAST);
|
||||
|
||||
auto pair = it->second.addrs.emplace(SockAddr(ifa->ifa_addr),
|
||||
SockAddr(hasB ? ifa->ifa_broadaddr : nullptr));
|
||||
|
||||
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %d %s\n",
|
||||
(long long)idx, ifa->ifa_name, af, pair.first->first.tostring().c_str());
|
||||
}
|
||||
|
||||
} catch(...){
|
||||
@@ -188,7 +239,7 @@ void IfaceMap::refresh() {
|
||||
}
|
||||
freeifaddrs(addrs);
|
||||
|
||||
info.swap(temp);
|
||||
return temp;
|
||||
}
|
||||
|
||||
} // namespace impl
|
||||
|
||||
+70
-13
@@ -10,11 +10,21 @@
|
||||
#include <osiSock.h>
|
||||
|
||||
#include <string>
|
||||
#include <string.h>
|
||||
|
||||
#include <event2/util.h>
|
||||
|
||||
#include <pvxs/version.h>
|
||||
|
||||
// added with Base 3.15
|
||||
#ifndef SOCK_EADDRNOTAVAIL
|
||||
# ifdef _WIN32
|
||||
# define SOCK_EADDRNOTAVAIL WSAEADDRNOTAVAIL
|
||||
# else
|
||||
# define SOCK_EADDRNOTAVAIL EADDRNOTAVAIL
|
||||
# endif
|
||||
#endif
|
||||
|
||||
namespace pvxs {
|
||||
|
||||
PVXS_API
|
||||
@@ -39,16 +49,16 @@ private:
|
||||
public:
|
||||
|
||||
explicit SockAddr(int af = AF_UNSPEC);
|
||||
explicit SockAddr(int af, const char *address, unsigned short port=0);
|
||||
explicit SockAddr(const sockaddr *addr, ev_socklen_t len);
|
||||
inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {}
|
||||
explicit SockAddr(const char *address, unsigned short port=0);
|
||||
explicit SockAddr(const sockaddr *addr);
|
||||
inline explicit SockAddr(const std::string& address, unsigned short port=0) :SockAddr(address.c_str(), port) {}
|
||||
|
||||
size_t size() const;
|
||||
size_t size() const noexcept;
|
||||
inline
|
||||
size_t capacity() const { return sizeof(store); }
|
||||
|
||||
inline unsigned short family() const { return store.sa.sa_family; }
|
||||
unsigned short port() const;
|
||||
inline unsigned short family() const noexcept { return store.sa.sa_family; }
|
||||
unsigned short port() const noexcept;
|
||||
void setPort(unsigned short port);
|
||||
SockAddr withPort(unsigned short port) const {
|
||||
SockAddr temp(*this);
|
||||
@@ -57,10 +67,15 @@ public:
|
||||
}
|
||||
|
||||
void setAddress(const char *, unsigned short port=0);
|
||||
inline void setAddress(const std::string& s, unsigned short port=0) {
|
||||
setAddress(s.c_str(), port);
|
||||
}
|
||||
|
||||
bool isAny() const;
|
||||
bool isLO() const;
|
||||
bool isMCast() const;
|
||||
bool isAny() const noexcept;
|
||||
bool isLO() const noexcept;
|
||||
bool isMCast() const noexcept;
|
||||
|
||||
SockAddr map4to6() const;
|
||||
|
||||
store_t* operator->() { return &store; }
|
||||
const store_t* operator->() const { return &store; }
|
||||
@@ -95,11 +110,53 @@ struct SockAddrOnlyLess {
|
||||
PVXS_API
|
||||
std::ostream& operator<<(std::ostream& strm, const SockAddr& addr);
|
||||
|
||||
// Linux specific include OS dropped packet counter as cmsg
|
||||
void enable_SO_RXQ_OVFL(SOCKET sock);
|
||||
// Include destination address as cmsg
|
||||
// resolved multicast group membership request
|
||||
struct MCastMembership {
|
||||
int af = AF_UNSPEC;
|
||||
union {
|
||||
ip_mreq in;
|
||||
ipv6_mreq in6;
|
||||
} req{};
|
||||
bool operator<(const MCastMembership& o) const {
|
||||
if(af==o.af) {
|
||||
if(af==AF_INET)
|
||||
return memcmp(&req.in, &o.req.in, sizeof(o.req.in));
|
||||
else
|
||||
return memcmp(&req.in6, &o.req.in6, sizeof(o.req.in6));
|
||||
}
|
||||
return af<o.af;
|
||||
}
|
||||
};
|
||||
|
||||
/** search/beacon destination
|
||||
*
|
||||
* <IP46>
|
||||
* <IP46>,<ttl#>
|
||||
* <IP46>@iface
|
||||
* <IP46>,<ttl#>@iface
|
||||
*/
|
||||
struct PVXS_API SockEndpoint {
|
||||
SockAddr addr; // ucast, mcast, or bcast
|
||||
// if mcast, then output TTL and interface
|
||||
int ttl=-1;
|
||||
std::string iface;
|
||||
|
||||
SockEndpoint() = default;
|
||||
SockEndpoint(const char* ep, uint16_t defport=0);
|
||||
SockEndpoint(const std::string& ep, uint16_t defport=0) :SockEndpoint(ep.c_str(), defport) {}
|
||||
explicit SockEndpoint(const SockAddr& addr) :addr(addr) {}
|
||||
|
||||
MCastMembership resolve() const;
|
||||
};
|
||||
|
||||
PVXS_API
|
||||
void enable_IP_PKTINFO(SOCKET sock);
|
||||
std::ostream& operator<<(std::ostream& strm, const SockEndpoint& addr);
|
||||
|
||||
PVXS_API
|
||||
bool operator==(const SockEndpoint& lhs, const SockEndpoint& rhs);
|
||||
|
||||
inline
|
||||
bool operator!=(const SockEndpoint& lhs, const SockEndpoint& rhs) { return !(lhs==rhs); }
|
||||
|
||||
struct recvfromx {
|
||||
evutil_socket_t sock;
|
||||
|
||||
+8
-2
@@ -589,8 +589,13 @@ enum class pva_subcmd {
|
||||
};
|
||||
|
||||
struct Header {
|
||||
uint8_t cmd, flags;
|
||||
uint32_t len;
|
||||
uint8_t cmd=0u, flags=0u, version=0u;
|
||||
uint32_t len=0u;
|
||||
constexpr Header() {}
|
||||
explicit
|
||||
constexpr Header(uint8_t cmd, uint8_t flags, uint32_t len)
|
||||
:cmd(cmd), flags(flags), version(0u), len(len)
|
||||
{}
|
||||
};
|
||||
|
||||
template<typename Buf>
|
||||
@@ -622,6 +627,7 @@ void from_wire(Buf& buf, Header& H)
|
||||
} else {
|
||||
H.cmd = buf[3];
|
||||
H.flags = buf[2];
|
||||
H.version = buf[1];
|
||||
// Set/change buffer endianness
|
||||
buf.be = H.flags&pva_flags::MSB;
|
||||
buf.skip(4u, __FILE__, __LINE__);
|
||||
|
||||
+7
-2
@@ -940,10 +940,15 @@ public:
|
||||
DiscoverBuilder Context::discover(std::function<void (const Discovered &)> && fn) { return DiscoverBuilder(pvt, std::move(fn)); }
|
||||
|
||||
struct PVXS_API Config {
|
||||
//! List of unicast and broadcast addresses
|
||||
/** List of unicast, multicast, and broadcast addresses to which search requests will be sent.
|
||||
*
|
||||
* Entries may take the forms:
|
||||
* - <ipaddr>[:<port#>]
|
||||
* - <ipmultiaddr>[:<port>][,<ttl>][@<ifaceaddr>]
|
||||
*/
|
||||
std::vector<std::string> addressList;
|
||||
|
||||
//! List of interface addresses on which beacons may be received.
|
||||
//! List of local interface addresses on which beacons may be received.
|
||||
//! Also constrains autoAddrList to only consider broadcast addresses of listed interfaces.
|
||||
//! Empty implies wildcard 0.0.0.0
|
||||
std::vector<std::string> interfaces;
|
||||
|
||||
+43
-26
@@ -375,10 +375,12 @@ std::ostream& operator<<(std::ostream& strm, const Server& serv)
|
||||
}
|
||||
|
||||
Server::Pvt::Pvt(const Config &conf)
|
||||
:effective(conf)
|
||||
:canIPv6(evsocket::canIPv6())
|
||||
,effective(conf)
|
||||
,beaconMsg(128)
|
||||
,acceptor_loop("PVXTCP", epicsThreadPriorityCAServerLow-2)
|
||||
,beaconSender(AF_INET, SOCK_DGRAM, 0)
|
||||
,beaconSender4(AF_INET, SOCK_DGRAM, 0)
|
||||
,beaconSender6(AF_INET6, SOCK_DGRAM, 0)
|
||||
,beaconTimer(event_new(acceptor_loop.base, -1, EV_TIMEOUT, doBeaconsS, this))
|
||||
,searchReply(0x10000)
|
||||
,builtinsrc(StaticSource::build())
|
||||
@@ -386,62 +388,74 @@ Server::Pvt::Pvt(const Config &conf)
|
||||
{
|
||||
effective.expand();
|
||||
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(beaconSender.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val)))
|
||||
log_err_printf(serversetup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO);
|
||||
}
|
||||
beaconSender4.set_broadcast(true);
|
||||
|
||||
auto manager = UDPManager::instance();
|
||||
|
||||
evsocket dummy(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
for(const auto& iface : effective.interfaces) {
|
||||
SockAddr addr(AF_INET, iface.c_str());
|
||||
addr.setPort(effective.udp_port);
|
||||
const auto cb(std::bind(&Pvt::onSearch, this, std::placeholders::_1));
|
||||
|
||||
listeners.push_back(manager.onSearch(addr,
|
||||
std::bind(&Pvt::onSearch, this, std::placeholders::_1) ));
|
||||
std::vector<SockAddr> tcpifaces; // may have port zero
|
||||
for(const auto& iface : effective.interfaces) {
|
||||
SockEndpoint addr(iface.c_str());
|
||||
if(!addr.addr.isMCast())
|
||||
tcpifaces.push_back(addr.addr);
|
||||
|
||||
addr.addr.setPort(effective.udp_port);
|
||||
|
||||
listeners.push_back(manager.onSearch(addr, cb));
|
||||
|
||||
// update to allow udp_port==0
|
||||
effective.udp_port = addr.port();
|
||||
effective.udp_port = addr.addr.port();
|
||||
|
||||
if(addr.addr.family()==AF_INET && addr.addr.isAny()) {
|
||||
// if listening on 0.0.0.0, also listen on [::]
|
||||
auto any6(addr);
|
||||
any6.addr = SockAddr::any(AF_INET6);
|
||||
|
||||
listeners.push_back(manager.onSearch(any6, cb));
|
||||
}
|
||||
|
||||
#ifndef _WIN32
|
||||
if(!addr.isAny()) {
|
||||
if(addr.addr.family()==AF_INET && !addr.addr.isAny() && !addr.addr.isMCast()) {
|
||||
/* An oddness of BSD sockets (not winsock) is that binding to
|
||||
* INADDR_ANY will receive unicast and broadcast, but binding to
|
||||
* a specific interface address receives only unicast. The trick
|
||||
* is to bind a second socket to the interface broadcast address,
|
||||
* which will then receive only broadcasts.
|
||||
*/
|
||||
for(auto bcast : dummy.broadcasts(&addr)) {
|
||||
bcast.setPort(addr.port());
|
||||
listeners.push_back(manager.onSearch(bcast,
|
||||
std::bind(&Pvt::onSearch, this, std::placeholders::_1) ));
|
||||
for(auto bcast : dummy.broadcasts(&addr.addr)) {
|
||||
bcast.setPort(addr.addr.port());
|
||||
listeners.push_back(manager.onSearch(bcast, cb));
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
for(const auto& addr : effective.ignoreAddrs) {
|
||||
SockAddr temp(AF_INET, addr.c_str());
|
||||
SockAddr temp(addr.c_str());
|
||||
ignoreList.push_back(temp);
|
||||
}
|
||||
|
||||
|
||||
acceptor_loop.call([this](){
|
||||
acceptor_loop.call([this, &tcpifaces](){
|
||||
// from accepter worker
|
||||
|
||||
bool firstiface = true;
|
||||
for(const auto& addr : effective.interfaces) {
|
||||
interfaces.emplace_back(addr, effective.tcp_port, this, firstiface);
|
||||
for(auto& addr : tcpifaces) {
|
||||
if(addr.port()==0)
|
||||
addr.setPort(effective.tcp_port);
|
||||
|
||||
interfaces.emplace_back(addr, this, firstiface);
|
||||
|
||||
if(firstiface || effective.tcp_port==0)
|
||||
effective.tcp_port = interfaces.back().bind_addr.port();
|
||||
firstiface = false;
|
||||
}
|
||||
|
||||
for(const auto& addr : effective.beaconDestinations) {
|
||||
beaconDest.emplace_back(AF_INET, addr.c_str(), effective.udp_port);
|
||||
beaconDest.emplace_back(addr.c_str(), effective.udp_port);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -706,10 +720,13 @@ void Server::Pvt::doBeacons(short evt)
|
||||
assert(M.good() && H.good());
|
||||
|
||||
for(const auto& dest : beaconDest) {
|
||||
int ntx = sendto(beaconSender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest->sa, dest.size());
|
||||
auto& sender = dest.addr.family()==AF_INET ? beaconSender4 : beaconSender6;
|
||||
sender.mcast_prep_sendto(dest);
|
||||
|
||||
int ntx = sendto(sender.sock, (char*)beaconMsg.data(), pktlen, 0, &dest.addr->sa, dest.addr.size());
|
||||
|
||||
if(ntx<0) {
|
||||
int err = evutil_socket_geterror(beaconSender.sock);
|
||||
int err = evutil_socket_geterror(sender.sock);
|
||||
auto lvl = Level::Warn;
|
||||
if(err==EINTR || err==EPERM)
|
||||
lvl = Level::Debug;
|
||||
@@ -721,7 +738,7 @@ void Server::Pvt::doBeacons(short evt)
|
||||
unsigned(ntx), unsigned(pktlen));
|
||||
|
||||
} else {
|
||||
log_debug_printf(serverio, "Beacon tx to %s\n", dest.tostring().c_str());
|
||||
log_debug_printf(serverio, "Beacon tx to %s\n", std::string(SB()<<dest).c_str());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+14
-9
@@ -46,7 +46,7 @@ DEFINE_LOGGER(remote, "pvxs.remote.log");
|
||||
ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *peer, int socklen)
|
||||
:ConnBase(false,
|
||||
bufferevent_socket_new(iface->server->acceptor_loop.base, sock, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS),
|
||||
SockAddr(peer, socklen))
|
||||
SockAddr(peer))
|
||||
,iface(iface)
|
||||
{
|
||||
log_debug_printf(connio, "Client %s connects\n", peerName.c_str());
|
||||
@@ -379,14 +379,21 @@ void ServerConn::bevWrite()
|
||||
}
|
||||
|
||||
|
||||
ServIface::ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback)
|
||||
ServIface::ServIface(const SockAddr &addr, server::Server::Pvt *server, bool fallback)
|
||||
:server(server)
|
||||
,bind_addr(AF_INET, addr.c_str(), port)
|
||||
,sock(AF_INET, SOCK_STREAM, 0)
|
||||
,bind_addr(addr)
|
||||
{
|
||||
server->acceptor_loop.assertInLoop();
|
||||
auto orig_port = bind_addr.port();
|
||||
|
||||
if(server->canIPv6 && bind_addr.family()==AF_INET && bind_addr.isAny()) {
|
||||
// promote to IPv6 with IPv4 support
|
||||
bind_addr = SockAddr::any(AF_INET6, bind_addr.port());
|
||||
log_debug_printf(connsetup, "Promote 0.0.0.0 -> [::]%s", "\n");
|
||||
}
|
||||
|
||||
sock = evsocket(bind_addr.family(), SOCK_STREAM, 0);
|
||||
|
||||
if(evutil_make_listen_socket_reuseable(sock.sock))
|
||||
log_warn_printf(connsetup, "Unable to make socket reusable%s", "\n");
|
||||
|
||||
@@ -396,9 +403,12 @@ ServIface::ServIface(const std::string& addr, unsigned short port, server::Serve
|
||||
sock.bind(bind_addr);
|
||||
} catch(std::system_error& e) {
|
||||
if(fallback && e.code().value()==SOCK_EADDRINUSE) {
|
||||
log_debug_printf(connsetup, "Address %s in use", bind_addr.tostring().c_str());
|
||||
bind_addr.setPort(0);
|
||||
fallback = false;
|
||||
continue;
|
||||
}
|
||||
log_err_printf(connsetup, "Bind to %s fails", bind_addr.tostring().c_str());
|
||||
throw;
|
||||
}
|
||||
break;
|
||||
@@ -426,11 +436,6 @@ void ServIface::onConnS(struct evconnlistener *listener, evutil_socket_t sock, s
|
||||
{
|
||||
auto self = static_cast<ServIface*>(raw);
|
||||
try {
|
||||
if(peer->sa_family!=AF_INET) {
|
||||
log_crit_printf(connsetup, "Interface %s Rejecting !ipv4 client\n", self->name.c_str());
|
||||
evutil_closesocket(sock);
|
||||
return;
|
||||
}
|
||||
auto conn(std::make_shared<ServerConn>(self, sock, peer, socklen));
|
||||
self->server->connections[conn.get()] = std::move(conn);
|
||||
}catch(std::exception& e){
|
||||
|
||||
+4
-3
@@ -169,7 +169,7 @@ struct ServIface
|
||||
evsocket sock;
|
||||
evlisten listener;
|
||||
|
||||
ServIface(const std::string& addr, unsigned short port, server::Server::Pvt *server, bool fallback);
|
||||
ServIface(const SockAddr &addr, server::Server::Pvt *server, bool fallback);
|
||||
|
||||
static void onConnS(struct evconnlistener *listener, evutil_socket_t sock, struct sockaddr *peer, int socklen, void *raw);
|
||||
};
|
||||
@@ -200,6 +200,7 @@ using namespace impl;
|
||||
struct Server::Pvt
|
||||
{
|
||||
SockAttach attach;
|
||||
const bool canIPv6;
|
||||
|
||||
std::weak_ptr<Server::Pvt> internal_self;
|
||||
|
||||
@@ -218,13 +219,13 @@ struct Server::Pvt
|
||||
evbase acceptor_loop;
|
||||
|
||||
std::list<std::unique_ptr<UDPListener> > listeners;
|
||||
std::vector<SockAddr> beaconDest;
|
||||
std::vector<SockEndpoint> beaconDest;
|
||||
std::vector<SockAddr> ignoreList;
|
||||
|
||||
std::list<ServIface> interfaces;
|
||||
std::map<ServerConn*, std::shared_ptr<ServerConn> > connections;
|
||||
|
||||
evsocket beaconSender;
|
||||
evsocket beaconSender4, beaconSender6;
|
||||
evevent beaconTimer;
|
||||
|
||||
std::vector<uint8_t> searchReply;
|
||||
|
||||
+102
-77
@@ -36,9 +36,9 @@ struct UDPCollector : public UDPManager::Search,
|
||||
{
|
||||
UDPManager::Pvt* const manager;
|
||||
SockAddr bind_addr; // address our socket is bound to
|
||||
SockAddr lo_mcast_addr; // destination endpoint for local mcast forwarding
|
||||
SockEndpoint lo_mcast_addr; // destination endpoint for local mcast forwarding
|
||||
SockAddr lo_addr;
|
||||
std::set<std::pair<SockAddr, SockAddr>> mcast_grps; // mcast group+iface pairs which our socket has joined
|
||||
std::set<MCastMembership> mcast_grps; // mcast group+iface pairs which our socket has joined
|
||||
std::string name;
|
||||
evsocket sock;
|
||||
evevent rx;
|
||||
@@ -50,7 +50,7 @@ struct UDPCollector : public UDPManager::Search,
|
||||
|
||||
std::set<UDPListener*> listeners;
|
||||
|
||||
UDPCollector(UDPManager::Pvt* manager, uint16_t port);
|
||||
UDPCollector(UDPManager::Pvt* manager, int af, uint16_t port);
|
||||
~UDPCollector();
|
||||
|
||||
void addListener(UDPListener *l);
|
||||
@@ -93,13 +93,15 @@ public:
|
||||
struct UDPManager::Pvt {
|
||||
|
||||
evbase loop;
|
||||
IfaceMap ifmap;
|
||||
IfaceMap& ifmap;
|
||||
|
||||
// only manipulate from loop worker thread
|
||||
std::map<uint16_t, UDPCollector*> collectors;
|
||||
// key'd by address family and port#
|
||||
std::map<std::pair<int, uint16_t>, UDPCollector*> collectors;
|
||||
|
||||
Pvt()
|
||||
:loop("PVXUDP", epicsThreadPriorityCAServerLow-4)
|
||||
,ifmap(IfaceMap::instance())
|
||||
{}
|
||||
~Pvt()
|
||||
{
|
||||
@@ -107,12 +109,12 @@ struct UDPManager::Pvt {
|
||||
assert(collectors.empty());
|
||||
}
|
||||
|
||||
std::shared_ptr<UDPCollector> collect(const SockAddr& dest)
|
||||
std::shared_ptr<UDPCollector> collect(const SockEndpoint& dest)
|
||||
{
|
||||
std::shared_ptr<UDPCollector> collector;
|
||||
|
||||
if(dest.port()!=0) {
|
||||
auto it = collectors.find(dest.port());
|
||||
if(dest.addr.port()!=0) {
|
||||
auto it = collectors.find(std::make_pair(dest.addr.family(), dest.addr.port()));
|
||||
if(it!=collectors.end()) {
|
||||
try {
|
||||
collector = it->second->shared_from_this();
|
||||
@@ -123,26 +125,26 @@ struct UDPManager::Pvt {
|
||||
}
|
||||
|
||||
if(!collector) {
|
||||
collector.reset(new UDPCollector(this, dest.port()));
|
||||
collector.reset(new UDPCollector(this, dest.addr.family(), dest.addr.port()));
|
||||
}
|
||||
return collector;
|
||||
}
|
||||
};
|
||||
|
||||
UDPCollector::UDPCollector(UDPManager::Pvt *manager, uint16_t requested_port)
|
||||
UDPCollector::UDPCollector(UDPManager::Pvt *manager, int af, uint16_t requested_port)
|
||||
:manager(manager)
|
||||
,bind_addr(SockAddr::any(AF_INET, requested_port))
|
||||
,lo_mcast_addr(bind_addr.family(), "224.0.0.128")
|
||||
,bind_addr(SockAddr::any(af, requested_port))
|
||||
,lo_mcast_addr("224.0.0.128,1@127.0.0.1")
|
||||
,lo_addr(SockAddr::loopback(bind_addr.family()))
|
||||
,sock(bind_addr.family(), SOCK_DGRAM, 0)
|
||||
,sock(af, SOCK_DGRAM, 0)
|
||||
,rx(event_new(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this))
|
||||
,beaconMsg(src)
|
||||
{
|
||||
manager->loop.assertInLoop();
|
||||
|
||||
epicsSocketEnableAddressUseForDatagramFanout(sock.sock);
|
||||
enable_SO_RXQ_OVFL(sock.sock);
|
||||
enable_IP_PKTINFO(sock.sock);
|
||||
sock.enable_SO_RXQ_OVFL();
|
||||
sock.enable_IP_PKTINFO();
|
||||
|
||||
/* Always bind to wildcard to receive all uni/broad/multicast, and also to send them.
|
||||
* Notes:
|
||||
@@ -158,31 +160,32 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, uint16_t requested_port)
|
||||
sock.bind(bind_addr);
|
||||
name = "UDP "+bind_addr.tostring();
|
||||
|
||||
lo_mcast_addr.setPort(bind_addr.port());
|
||||
lo_addr.setPort(bind_addr.port());
|
||||
if(af==AF_INET) {
|
||||
lo_mcast_addr.addr.setPort(bind_addr.port());
|
||||
lo_addr.setPort(bind_addr.port());
|
||||
|
||||
// join local group to receive
|
||||
sock.mcast_join(lo_mcast_addr, lo_addr);
|
||||
// setup for re-transmit
|
||||
sock.mcast_ttl(1); // make default explicit, we will only send to lo_mcast_addr.
|
||||
sock.mcast_loop(true);
|
||||
sock.mcast_iface(lo_addr);
|
||||
// join local group to receive
|
||||
auto Mem(lo_mcast_addr.resolve());
|
||||
sock.mcast_join(Mem);
|
||||
// setup for re-transmit
|
||||
sock.mcast_loop(true);
|
||||
|
||||
mcast_grps.emplace(lo_mcast_addr, lo_addr);
|
||||
mcast_grps.emplace(Mem);
|
||||
}
|
||||
|
||||
log_info_printf(logsetup, "Bound %d to %s as lo\n", sock.sock, name.c_str());
|
||||
|
||||
if(event_add(rx.get(), nullptr))
|
||||
throw std::runtime_error("Unable to create collector Rx event");
|
||||
|
||||
manager->collectors[bind_addr.port()] = this;
|
||||
manager->collectors[std::make_pair(af, bind_addr.port())] = this;
|
||||
}
|
||||
|
||||
UDPCollector::~UDPCollector()
|
||||
{
|
||||
manager->loop.assertInLoop();
|
||||
|
||||
manager->collectors.erase(bind_addr.port());
|
||||
manager->collectors.erase(std::make_pair(bind_addr.family(), bind_addr.port()));
|
||||
|
||||
// we should only be destroyed after that last listener has removed itself
|
||||
assert(listeners.empty());
|
||||
@@ -191,23 +194,24 @@ UDPCollector::~UDPCollector()
|
||||
|
||||
void UDPCollector::addListener(UDPListener *l)
|
||||
{
|
||||
for(const auto& mcast : l->mcasts) {
|
||||
const auto tup(std::make_pair(mcast, l->dest));
|
||||
if(mcast_grps.find(tup)==mcast_grps.end()) {
|
||||
mcast_grps.insert(tup);
|
||||
|
||||
log_debug_printf(logsetup, "collector joining %s on %s\n",
|
||||
mcast.tostring().c_str(),
|
||||
l->dest.tostring().c_str());
|
||||
|
||||
sock.mcast_join(mcast, l->dest);
|
||||
if(l->dest.addr.isMCast()) {
|
||||
l->cur = l->dest.resolve();
|
||||
auto it(mcast_grps.find(l->cur));
|
||||
if(it==mcast_grps.end() && sock.mcast_join(l->cur)) {
|
||||
mcast_grps.emplace(l->cur);
|
||||
log_debug_printf(logsetup, "collector joining %s\n",
|
||||
std::string(SB()<<l->dest).c_str());
|
||||
}
|
||||
}
|
||||
listeners.insert(l);
|
||||
|
||||
log_debug_printf(logsetup, "Start listening for UDP %s\n", std::string(SB()<<l->dest).c_str());
|
||||
}
|
||||
|
||||
void UDPCollector::delListener(UDPListener *l)
|
||||
{
|
||||
log_debug_printf(logsetup, "Stop listening for UDP %s\n", std::string(SB()<<l->dest).c_str());
|
||||
|
||||
listeners.erase(l);
|
||||
|
||||
// TODO: bother to cleanup mcast group membership?
|
||||
@@ -236,9 +240,7 @@ bool UDPCollector::handle_one()
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(sock.sock);
|
||||
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
||||
// nothing to do here
|
||||
} else {
|
||||
if(err!=SOCK_EWOULDBLOCK && err!=EAGAIN && err!=SOCK_EINTR) {
|
||||
log_warn_printf(logio, "UDP RX Error on %s : %s\n", name.c_str(),
|
||||
evutil_socket_error_to_string(err));
|
||||
}
|
||||
@@ -246,6 +248,15 @@ bool UDPCollector::handle_one()
|
||||
|
||||
}
|
||||
|
||||
if(dest.family()!=AF_UNSPEC)
|
||||
dest.setPort(bind_addr.port());
|
||||
|
||||
if(src.isMCast()) {
|
||||
// should never happen. It it does, we won't be tricked into amplifying a DDoS.
|
||||
log_debug_printf(logio, "Ignoring UDP with mcast source %s.\n", src.tostring().c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
log_hex_printf(logio, Level::Debug, rxbuf, nrx, "UDP %d Rx %d, %s -> %s @%u (%s)\n",
|
||||
sock.sock, nrx, src.tostring().c_str(), dest.tostring().c_str(), unsigned(rx.dstif), bind_addr.tostring().c_str());
|
||||
|
||||
@@ -280,8 +291,9 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
switch(head.cmd) {
|
||||
|
||||
case CMD_SEARCH: {
|
||||
peerVersion = head.version;
|
||||
|
||||
uint8_t flags = 0;
|
||||
SockAddr replyAddr;
|
||||
uint16_t port = 0;
|
||||
|
||||
from_wire(M, searchID);
|
||||
@@ -292,16 +304,16 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
M.skip(3, __FILE__, __LINE__); // unused/reserved
|
||||
|
||||
auto save_replyAddr = M.save();
|
||||
from_wire(M, replyAddr);
|
||||
from_wire(M, server);
|
||||
from_wire(M, port);
|
||||
if(replyAddr.isAny()) {
|
||||
replyAddr = src;
|
||||
if(server.isAny()) {
|
||||
server = src;
|
||||
if(origin==OriginTag) {
|
||||
log_err_printf(logio, "CMD_ORIGIN_TAG search with reply to sender never works%s", "\n");
|
||||
return;
|
||||
}
|
||||
}
|
||||
replyAddr.setPort(port);
|
||||
server.setPort(port);
|
||||
|
||||
if(M.good() && origin==Loopback && (flags&pva_search_flags::Unicast) && dest.family()!=AF_UNSPEC) {
|
||||
assert(buf==&this->buf[cmd_origin_tag_size]);
|
||||
@@ -310,7 +322,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
// recipient of forwarded message must use, and trust, replyAddr in body :(
|
||||
{
|
||||
FixedBuf R(M.be, save_replyAddr, 16u);
|
||||
to_wire(R, replyAddr);
|
||||
to_wire(R, server);
|
||||
assert(R.good());
|
||||
}
|
||||
forwardM(dest, buf, nrx);
|
||||
@@ -318,18 +330,18 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
}
|
||||
|
||||
// so far, only "tcp" transport has ever been seen.
|
||||
// however, we will consider and ignore any others which might appear
|
||||
bool foundtcp = false;
|
||||
// however, we will pass through others which might appear
|
||||
otherproto.clear();
|
||||
Size nproto{0};
|
||||
from_wire(M, nproto);
|
||||
for(size_t i=0; i<nproto.size && M.good(); i++) {
|
||||
Size nchar{0};
|
||||
from_wire(M, nchar);
|
||||
|
||||
// shortcut to avoid allocating a std::string
|
||||
// "tcp" is the only value we expect to see
|
||||
foundtcp |= M.size()>=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p';
|
||||
M.skip(nchar.size, __FILE__, __LINE__);
|
||||
std::string prot;
|
||||
from_wire(M, prot);
|
||||
if(prot=="tcp") {
|
||||
protoTCP = true;
|
||||
} else {
|
||||
otherproto.push_back(prot);
|
||||
}
|
||||
}
|
||||
|
||||
// one Search message can include many PV names.
|
||||
@@ -348,14 +360,14 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
from_wire(M, chlen);
|
||||
// inject nil for previous PV name
|
||||
*mundge = '\0';
|
||||
if(foundtcp && chlen.size<=M.size() && M.good()) {
|
||||
if(protoTCP && chlen.size<=M.size() && M.good()) {
|
||||
names.push_back(UDPManager::Search::Name{reinterpret_cast<const char*>(M.save()), id});
|
||||
}
|
||||
M.skip(chlen.size, __FILE__, __LINE__);
|
||||
}
|
||||
|
||||
// used by our reply()
|
||||
src = replyAddr;
|
||||
src = server;
|
||||
|
||||
if(M.good()) {
|
||||
// ensure nil for final PV name
|
||||
@@ -372,6 +384,8 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
}
|
||||
|
||||
case CMD_BEACON: {
|
||||
beaconMsg.peerVersion = head.version;
|
||||
|
||||
uint16_t port = 0;
|
||||
|
||||
_from_wire<12>(M, &beaconMsg.guid[0], false, __FILE__, __LINE__);
|
||||
@@ -389,6 +403,8 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
|
||||
if(M.good()) {
|
||||
for(auto L : listeners) {
|
||||
if(L->dest.addr.compare(dest)!=0)
|
||||
break; // TODO: check interface index against L->cur
|
||||
if(L->beaconCB) {
|
||||
(L->beaconCB)(beaconMsg);
|
||||
}
|
||||
@@ -406,7 +422,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
// only accept when sent to the mcast address from the loopback address
|
||||
// since we only join the mcast group on loopback this will hopefully
|
||||
// frustrate attempts to inject CMD_ORIGIN_TAG externally.
|
||||
if(M.good() && origin==Loopback && dest.compare(lo_mcast_addr,false)==0 && src.isLO()) {
|
||||
if(M.good() && origin==Loopback && dest.compare(lo_mcast_addr.addr,false)==0 && src.isLO()) {
|
||||
originaddr.setPort(bind_addr.port());
|
||||
|
||||
process_one(originaddr, M.save(), M.size(), OriginTag);
|
||||
@@ -417,7 +433,7 @@ void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t
|
||||
originaddr.tostring().c_str(),
|
||||
M.good() ? 'T' : 'F',
|
||||
origin==Loopback ? 'T' : 'F',
|
||||
dest.compare(lo_mcast_addr,false)==0 ? 'T' : 'F',
|
||||
dest.compare(lo_mcast_addr.addr,false)==0 ? 'T' : 'F',
|
||||
src.isLO() ? 'T' : 'F');
|
||||
|
||||
break;
|
||||
@@ -445,7 +461,8 @@ void UDPCollector::forwardM(const SockAddr& origin, const uint8_t *pbuf, size_t
|
||||
assert(M.save()==&buf[cmd_origin_tag_size]);
|
||||
}
|
||||
|
||||
src = lo_mcast_addr;
|
||||
sock.mcast_prep_sendto(lo_mcast_addr);
|
||||
src = lo_mcast_addr.addr;
|
||||
reply(&buf[0], cmd_origin_tag_size+plen);
|
||||
}
|
||||
|
||||
@@ -518,7 +535,7 @@ void UDPManager::cleanup()
|
||||
udp_gbl = nullptr;
|
||||
}
|
||||
|
||||
std::unique_ptr<UDPListener> UDPManager::onBeacon(SockAddr& dest,
|
||||
std::unique_ptr<UDPListener> UDPManager::onBeacon(SockEndpoint &dest,
|
||||
std::function<void(const Beacon&)>&& cb)
|
||||
{
|
||||
if(!pvt)
|
||||
@@ -536,7 +553,16 @@ std::unique_ptr<UDPListener> UDPManager::onBeacon(SockAddr& dest,
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::unique_ptr<UDPListener> UDPManager::onSearch(SockAddr& dest,
|
||||
std::unique_ptr<UDPListener> UDPManager::onBeacon(SockAddr& dest,
|
||||
std::function<void(const Beacon&)>&& cb)
|
||||
{
|
||||
SockEndpoint ep(dest);
|
||||
auto ret(onBeacon(ep, std::move(cb)));
|
||||
dest = ep.addr;
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::unique_ptr<UDPListener> UDPManager::onSearch(SockEndpoint &dest,
|
||||
std::function<void(const Search&)>&& cb)
|
||||
{
|
||||
if(!pvt)
|
||||
@@ -554,6 +580,15 @@ std::unique_ptr<UDPListener> UDPManager::onSearch(SockAddr& dest,
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::unique_ptr<UDPListener> UDPManager::onSearch(SockAddr& dest,
|
||||
std::function<void(const Search&)>&& cb)
|
||||
{
|
||||
SockEndpoint ep(dest);
|
||||
auto ret(onSearch(ep, std::move(cb)));
|
||||
dest = ep.addr;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void UDPManager::sync()
|
||||
{
|
||||
if(!pvt)
|
||||
@@ -562,15 +597,16 @@ void UDPManager::sync()
|
||||
pvt->loop.sync();
|
||||
}
|
||||
|
||||
UDPListener::UDPListener(const std::shared_ptr<UDPManager::Pvt> &manager, SockAddr &dest)
|
||||
UDPListener::UDPListener(const std::shared_ptr<UDPManager::Pvt> &manager, SockEndpoint &ep)
|
||||
:manager(manager)
|
||||
,dest(dest)
|
||||
,collector(manager->collect(ep))
|
||||
,dest([&ep, this]() -> SockEndpoint{
|
||||
ep.addr.setPort(collector->bind_addr.port());
|
||||
return ep;
|
||||
}())
|
||||
,active(false)
|
||||
{
|
||||
manager->loop.assertInLoop();
|
||||
|
||||
collector = manager->collect(dest);
|
||||
dest.setPort(collector->bind_addr.port());
|
||||
}
|
||||
|
||||
UDPListener::~UDPListener()
|
||||
@@ -585,17 +621,6 @@ UDPListener::~UDPListener()
|
||||
});
|
||||
}
|
||||
|
||||
void UDPListener::addMCast(const SockAddr& mcast)
|
||||
{
|
||||
manager->loop.call([this, &mcast](){
|
||||
if(active)
|
||||
throw std::logic_error("must addMCast() before start()");
|
||||
|
||||
collector->mcast_grps.emplace(mcast.withPort(collector->bind_addr.port()),
|
||||
dest);
|
||||
});
|
||||
}
|
||||
|
||||
void UDPListener::start(bool s)
|
||||
{
|
||||
manager->loop.call([this, s](){
|
||||
|
||||
+11
-5
@@ -39,17 +39,23 @@ struct PVXS_API UDPManager
|
||||
std::string proto;
|
||||
SockAddr server;
|
||||
ServerGUID guid;
|
||||
uint8_t peerVersion;
|
||||
Beacon(const SockAddr& src) :src(src) {}
|
||||
};
|
||||
//! Create subscription for Beacon messages.
|
||||
//! Must call UDPListener::start()
|
||||
std::unique_ptr<UDPListener> onBeacon(SockEndpoint& dest,
|
||||
std::function<void(const Beacon&)>&& cb);
|
||||
std::unique_ptr<UDPListener> onBeacon(SockAddr& dest,
|
||||
std::function<void(const Beacon&)>&& cb);
|
||||
|
||||
struct PVXS_API Search {
|
||||
std::vector<std::string> otherproto; // any protocols other than "tcp"
|
||||
SockAddr src;
|
||||
SockAddr server;
|
||||
uint32_t searchID;
|
||||
uint8_t peerVersion;
|
||||
bool protoTCP; // included protocol "tcp"
|
||||
bool mustReply;
|
||||
struct Name {
|
||||
const char *name;
|
||||
@@ -66,6 +72,8 @@ struct PVXS_API UDPManager
|
||||
};
|
||||
//! Create subscription for Search messages.
|
||||
//! Must call UDPListener::start()
|
||||
std::unique_ptr<UDPListener> onSearch(SockEndpoint& dest,
|
||||
std::function<void(const Search&)>&& cb);
|
||||
std::unique_ptr<UDPListener> onSearch(SockAddr& dest,
|
||||
std::function<void(const Search&)>&& cb);
|
||||
|
||||
@@ -91,8 +99,8 @@ class PVXS_API UDPListener
|
||||
std::function<void(UDPManager::Beacon&)> beaconCB;
|
||||
const std::shared_ptr<UDPManager::Pvt> manager;
|
||||
std::shared_ptr<UDPCollector> collector;
|
||||
const SockAddr dest;
|
||||
std::set<SockAddr> mcasts;
|
||||
const SockEndpoint dest;
|
||||
MCastMembership cur;
|
||||
bool active;
|
||||
|
||||
INST_COUNTER(UDPListener);
|
||||
@@ -100,12 +108,10 @@ class PVXS_API UDPListener
|
||||
friend struct UDPCollector;
|
||||
friend struct UDPManager;
|
||||
|
||||
UDPListener(const std::shared_ptr<UDPManager::Pvt>& manager, SockAddr& dest);
|
||||
UDPListener(const std::shared_ptr<UDPManager::Pvt>& manager, SockEndpoint& dest);
|
||||
public:
|
||||
~UDPListener();
|
||||
|
||||
void addMCast(const SockAddr& mcast);
|
||||
|
||||
void start(bool s=true);
|
||||
inline void stop() { start(false); }
|
||||
};
|
||||
|
||||
@@ -50,6 +50,7 @@ void cleanup_for_valgrind()
|
||||
#endif
|
||||
impl::logger_shutdown();
|
||||
impl::UDPManager::cleanup();
|
||||
IfaceMap::cleanup();
|
||||
}
|
||||
|
||||
testCase::testCase()
|
||||
|
||||
+132
-23
@@ -258,8 +258,8 @@ SigInt::~SigInt()
|
||||
|
||||
|
||||
SockAddr::SockAddr(int af)
|
||||
:store{}
|
||||
{
|
||||
memset(&store, 0, sizeof(store));
|
||||
store.sa.sa_family = af;
|
||||
if(af!=AF_INET
|
||||
#ifdef AF_INET6
|
||||
@@ -269,21 +269,26 @@ SockAddr::SockAddr(int af)
|
||||
throw std::invalid_argument("Unsupported address family");
|
||||
}
|
||||
|
||||
SockAddr::SockAddr(int af, const char *address, unsigned short port)
|
||||
:SockAddr(af)
|
||||
SockAddr::SockAddr(const char *address, unsigned short port)
|
||||
:SockAddr(AF_UNSPEC)
|
||||
{
|
||||
setAddress(address, port);
|
||||
}
|
||||
|
||||
SockAddr::SockAddr(const sockaddr *addr, ev_socklen_t len)
|
||||
:SockAddr(addr->sa_family)
|
||||
SockAddr::SockAddr(const sockaddr *addr)
|
||||
:SockAddr(addr ? addr->sa_family : AF_UNSPEC)
|
||||
{
|
||||
if(len<0 || len>ev_socklen_t(size()))
|
||||
throw std::invalid_argument("Truncated Address");
|
||||
memcpy(&store, addr, len);
|
||||
if(!addr)
|
||||
return; // treat NULL as AF_UNSPEC
|
||||
|
||||
if(family()!=AF_UNSPEC && family()!=AF_INET && family()!=AF_INET6)
|
||||
throw std::invalid_argument("Unsupported address family");
|
||||
|
||||
if(family()!=AF_UNSPEC)
|
||||
memcpy(&store, addr, family()==AF_INET ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
|
||||
}
|
||||
|
||||
size_t SockAddr::size() const
|
||||
size_t SockAddr::size() const noexcept
|
||||
{
|
||||
switch(store.sa.sa_family) {
|
||||
case AF_INET: return sizeof(store.in);
|
||||
@@ -295,7 +300,7 @@ size_t SockAddr::size() const
|
||||
}
|
||||
}
|
||||
|
||||
unsigned short SockAddr::port() const
|
||||
unsigned short SockAddr::port() const noexcept
|
||||
{
|
||||
switch(store.sa.sa_family) {
|
||||
case AF_INET: return ntohs(store.in.sin_port);
|
||||
@@ -318,17 +323,97 @@ void SockAddr::setPort(unsigned short port)
|
||||
}
|
||||
}
|
||||
|
||||
void SockAddr::setAddress(const char *name, unsigned short port)
|
||||
void SockAddr::setAddress(const char *name, unsigned short defport)
|
||||
{
|
||||
SockAddr temp(AF_INET);
|
||||
if(aToIPAddr(name, port, &temp->in))
|
||||
throw std::runtime_error(std::string("Unable to parse as IP address: ")+name);
|
||||
if(temp.port()==0)
|
||||
temp.setPort(port);
|
||||
assert(name);
|
||||
// too bad evutil_parse_sockaddr_port() treats ":0" as an error...
|
||||
|
||||
/* looking for
|
||||
* [ipv6]:port
|
||||
* ipv6
|
||||
* [ipv6]
|
||||
* ipv4:port
|
||||
* ipv4
|
||||
*/
|
||||
// TODO: could optimize to find all of these with a single loop
|
||||
const char *firstc = strchr(name, ':'),
|
||||
*lastc = strrchr(name, ':'),
|
||||
*openb = strchr(name, '['),
|
||||
*closeb = strrchr(name, ']');
|
||||
|
||||
if(!openb ^ !closeb) {
|
||||
// '[' w/o ']' or vis. versa
|
||||
throw std::runtime_error(SB()<<"IPv6 with mismatched brackets \""<<escape(name)<<"\"");
|
||||
}
|
||||
|
||||
char scratch[INET6_ADDRSTRLEN+1];
|
||||
const char *addr, *port;
|
||||
SockAddr temp;
|
||||
void *sockaddr;
|
||||
|
||||
if(!firstc && !openb) {
|
||||
// no brackets or port.
|
||||
// plain ipv4
|
||||
addr = name;
|
||||
port = nullptr;
|
||||
temp->sa.sa_family = AF_INET;
|
||||
sockaddr = (void*)&temp->in.sin_addr.s_addr;
|
||||
|
||||
} else if(firstc && firstc==lastc && !openb) {
|
||||
// no bracket and only one ':'
|
||||
// ipv4 w/ port
|
||||
size_t addrlen = firstc-name;
|
||||
if(addrlen >= sizeof(scratch))
|
||||
throw std::runtime_error(SB()<<"IPv4 address too long \""<<escape(name)<<"\"");
|
||||
|
||||
memcpy(scratch, name, addrlen);
|
||||
scratch[addrlen] = '\0';
|
||||
addr = scratch;
|
||||
port = lastc+1;
|
||||
temp->sa.sa_family = AF_INET;
|
||||
sockaddr = (void*)&temp->in.sin_addr.s_addr;
|
||||
|
||||
} else if(firstc && firstc!=lastc && !openb) {
|
||||
// no bracket and more than one ':'
|
||||
// bare ipv6
|
||||
addr = name;
|
||||
port = nullptr;
|
||||
temp->sa.sa_family = AF_INET6;
|
||||
sockaddr = (void*)&temp->in6.sin6_addr;
|
||||
|
||||
} else if(openb) {
|
||||
// brackets
|
||||
// ipv6, maybe with port
|
||||
size_t addrlen = closeb-openb-1u;
|
||||
if(addrlen >= sizeof(scratch))
|
||||
throw std::runtime_error(SB()<<"IPv6 address too long \""<<escape(name)<<"\"");
|
||||
|
||||
memcpy(scratch, openb+1, addrlen);
|
||||
scratch[addrlen] = '\0';
|
||||
addr = scratch;
|
||||
if(lastc > closeb)
|
||||
port = lastc+1;
|
||||
else
|
||||
port = nullptr;
|
||||
temp->sa.sa_family = AF_INET6;
|
||||
sockaddr = (void*)&temp->in6.sin6_addr;
|
||||
|
||||
} else {
|
||||
throw std::runtime_error(SB()<<"Invalid IP address form \""<<escape(name)<<"\"");
|
||||
}
|
||||
|
||||
if(evutil_inet_pton(temp->sa.sa_family, addr, sockaddr)<=0)
|
||||
throw std::runtime_error(SB()<<"Not a valid IP address \""<<escape(name)<<"\"");
|
||||
|
||||
if(port)
|
||||
temp.setPort(parseTo<uint64_t>(port));
|
||||
else
|
||||
temp.setPort(defport);
|
||||
|
||||
(*this) = temp;
|
||||
}
|
||||
|
||||
bool SockAddr::isAny() const
|
||||
bool SockAddr::isAny() const noexcept
|
||||
{
|
||||
switch(store.sa.sa_family) {
|
||||
case AF_INET: return store.in.sin_addr.s_addr==htonl(INADDR_ANY);
|
||||
@@ -339,7 +424,7 @@ bool SockAddr::isAny() const
|
||||
}
|
||||
}
|
||||
|
||||
bool SockAddr::isLO() const
|
||||
bool SockAddr::isLO() const noexcept
|
||||
{
|
||||
switch(store.sa.sa_family) {
|
||||
case AF_INET: return store.in.sin_addr.s_addr==htonl(INADDR_LOOPBACK);
|
||||
@@ -350,10 +435,10 @@ bool SockAddr::isLO() const
|
||||
}
|
||||
}
|
||||
|
||||
bool SockAddr::isMCast() const
|
||||
bool SockAddr::isMCast() const noexcept
|
||||
{
|
||||
switch(store.sa.sa_family) {
|
||||
case AF_INET: return IN_MULTICAST(store.in.sin_addr.s_addr);
|
||||
case AF_INET: return IN_MULTICAST(ntohl(store.in.sin_addr.s_addr));
|
||||
#ifdef AF_INET6
|
||||
case AF_INET6: return IN6_IS_ADDR_MULTICAST(&store.in6.sin6_addr);
|
||||
#endif
|
||||
@@ -361,6 +446,27 @@ bool SockAddr::isMCast() const
|
||||
}
|
||||
}
|
||||
|
||||
SockAddr SockAddr::map4to6() const
|
||||
{
|
||||
SockAddr ret;
|
||||
if(family()==AF_INET) {
|
||||
static_assert (sizeof(ret->in6.sin6_addr)==16, "");
|
||||
ret->in6.sin6_family = AF_INET6;
|
||||
ret->in6.sin6_addr.s6_addr[10] = 0xff;
|
||||
ret->in6.sin6_addr.s6_addr[11] = 0xff;
|
||||
memcpy(&ret->in6.sin6_addr.s6_addr[12], &store.in.sin_addr.s_addr, 4u);
|
||||
|
||||
ret->in6.sin6_port = store.in.sin_port;
|
||||
|
||||
} else if(family()==AF_INET6) {
|
||||
ret = *this;
|
||||
|
||||
} else {
|
||||
throw std::logic_error("Invalid address family");
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::string SockAddr::tostring() const
|
||||
{
|
||||
std::ostringstream strm;
|
||||
@@ -428,12 +534,15 @@ std::ostream& operator<<(std::ostream& strm, const SockAddr& addr)
|
||||
char buf[INET6_ADDRSTRLEN+1];
|
||||
if(evutil_inet_ntop(AF_INET6, &addr->in6.sin6_addr, buf, sizeof(buf))) {
|
||||
buf[sizeof(buf)-1] = '\0'; // paranoia
|
||||
strm<<'['<<buf<<']';
|
||||
|
||||
} else {
|
||||
strm<<"<\?\?\?>";
|
||||
}
|
||||
strm<<buf;
|
||||
if(ntohs(addr->in6.sin6_port))
|
||||
strm<<':'<<ntohs(addr->in6.sin6_port);
|
||||
if(addr->in6.sin6_scope_id)
|
||||
strm<<"%"<<addr->in6.sin6_scope_id;
|
||||
if(auto port = ntohs(addr->in6.sin6_port))
|
||||
strm<<':'<<port;
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
+1
-1
@@ -56,7 +56,7 @@ struct SB {
|
||||
};
|
||||
|
||||
|
||||
void threadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC fn, void *arg);
|
||||
void threadOnce(epicsThreadOnceId *id, EPICSTHREADFUNC fn, void *arg=nullptr);
|
||||
|
||||
namespace idetail {
|
||||
template <typename I>
|
||||
|
||||
+51
-34
@@ -7,6 +7,7 @@
|
||||
#include <osiSockExt.h>
|
||||
|
||||
#include <cstring>
|
||||
#include <system_error>
|
||||
|
||||
#include <epicsUnitTest.h>
|
||||
#include <testMain.h>
|
||||
@@ -36,35 +37,49 @@ void test_ifacemap()
|
||||
{
|
||||
testDiag("Enter %s", __func__);
|
||||
|
||||
impl::IfaceMap ifs;
|
||||
ifs.refresh();
|
||||
auto& ifs = IfaceMap::instance();
|
||||
|
||||
testFalse(ifs.info.empty())<<" found "<<ifs.info.size()<<" interfaces";
|
||||
epicsGuard<epicsMutex> G(ifs.lock); // since we are playing around with the internals...
|
||||
|
||||
ifs.refresh(true);
|
||||
|
||||
testFalse(ifs.byIndex.empty())<<" found "<<ifs.byIndex.size()<<" interfaces";
|
||||
|
||||
bool foundlo = false;
|
||||
const auto lo(SockAddr::loopback(AF_INET));
|
||||
|
||||
for(const auto& iface : ifs.info) {
|
||||
for(const auto& addr : iface.second) {
|
||||
if(addr!=lo)
|
||||
for(const auto& pair : ifs.byIndex) {
|
||||
auto& iface = pair.second;
|
||||
testDiag("Interface %u \"%s\"", unsigned(iface.index), iface.name.c_str());
|
||||
for(const auto& pair : iface.addrs) {
|
||||
testDiag(" Address %s/%s", pair.first.tostring().c_str(), pair.second.tostring().c_str());
|
||||
if(pair.first!=lo)
|
||||
continue;
|
||||
testTrue(!foundlo)<<" Found loopback with index "<<iface.first;
|
||||
testTrue(!foundlo)<<" Found loopback with index "<<iface.index;
|
||||
foundlo = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void test_udp()
|
||||
void test_udp(int af)
|
||||
{
|
||||
testDiag("Enter %s", __func__);
|
||||
testDiag("Enter %s(%d)", __func__, af);
|
||||
|
||||
evsocket A(AF_INET, SOCK_DGRAM, 0),
|
||||
B(AF_INET, SOCK_DGRAM, 0);
|
||||
evsocket A(af, SOCK_DGRAM, 0),
|
||||
B(af, SOCK_DGRAM, 0);
|
||||
|
||||
SockAddr bind_addr(SockAddr::loopback(AF_INET));
|
||||
SockAddr bind_addr(SockAddr::loopback(af));
|
||||
|
||||
enable_IP_PKTINFO(A.sock);
|
||||
A.bind(bind_addr);
|
||||
A.enable_IP_PKTINFO();
|
||||
try{
|
||||
A.bind(bind_addr);
|
||||
}catch(std::system_error& e){
|
||||
if(af==AF_INET6 && e.code().value()==SOCK_EADDRNOTAVAIL) {
|
||||
testSkip(7, "No runtime IPv6 support");
|
||||
return;
|
||||
}
|
||||
testAbort("Unable to bind %s : (%d) %s", bind_addr.tostring().c_str(), e.code().value(), e.what());
|
||||
}
|
||||
testNotEq(bind_addr.port(), 0)<<"bound port";
|
||||
|
||||
SockAddr send_addr(bind_addr);
|
||||
@@ -84,7 +99,7 @@ void test_udp()
|
||||
testDiag("Call recvfrom()");
|
||||
ret = recvfromx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest}.call();
|
||||
// only the destination address is captured, not the port
|
||||
if(dest.family()==AF_INET)
|
||||
if(dest.family()!=AF_UNSPEC)
|
||||
dest.setPort(bind_addr.port());
|
||||
|
||||
testOk(ret==4 && rxbuf[0]==0x12 && rxbuf[1]==0x34 && rxbuf[2]==0x56 && rxbuf[3]==0x78,
|
||||
@@ -102,28 +117,27 @@ void test_local_mcast()
|
||||
evsocket A(AF_INET, SOCK_DGRAM, 0),
|
||||
B(AF_INET, SOCK_DGRAM, 0);
|
||||
|
||||
SockAddr mcast_addr(AF_INET, "224.0.0.128");
|
||||
SockEndpoint mcast_addr("224.0.0.128,1@127.0.0.1");
|
||||
|
||||
// We could bind to mcast_addr on all targets except WIN32
|
||||
SockAddr bind_addr(SockAddr::any(AF_INET));
|
||||
|
||||
enable_IP_PKTINFO(A.sock);
|
||||
A.enable_IP_PKTINFO();
|
||||
A.bind(bind_addr);
|
||||
mcast_addr.setPort(bind_addr.port());
|
||||
mcast_addr.addr.setPort(bind_addr.port());
|
||||
|
||||
SockAddr sender_addr(SockAddr::loopback(AF_INET));
|
||||
B.bind(sender_addr);
|
||||
|
||||
// receiving socket joins on the loopback interface
|
||||
A.mcast_join(mcast_addr, sender_addr); // ignores port(s)
|
||||
A.mcast_join(mcast_addr.resolve()); // ignores port(s)
|
||||
|
||||
// sending socket targets the loopback interface
|
||||
B.mcast_iface(sender_addr); // ignores port(s)
|
||||
B.mcast_ttl(1);
|
||||
B.mcast_prep_sendto(mcast_addr); // ignores port(s)
|
||||
B.mcast_loop(true);
|
||||
|
||||
uint8_t msg[] = {0x12, 0x34, 0x56, 0x78};
|
||||
int ret = sendto(B.sock, (char*)msg, sizeof(msg), 0, &mcast_addr->sa, mcast_addr.size());
|
||||
int ret = sendto(B.sock, (char*)msg, sizeof(msg), 0, &mcast_addr.addr->sa, mcast_addr.addr.size());
|
||||
testEq(ret, (int)sizeof(msg))<<"Send test";
|
||||
|
||||
uint8_t rxbuf[8] = {};
|
||||
@@ -134,7 +148,7 @@ void test_local_mcast()
|
||||
recvfromx rx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest};
|
||||
ret = rx.call();
|
||||
if(dest.family()==AF_INET)
|
||||
dest.setPort(mcast_addr.port());
|
||||
dest.setPort(mcast_addr.addr.port());
|
||||
|
||||
testTrue(ret>=0 && rx.dstif>0 && ifinfo.has_address(rx.dstif, sender_addr))
|
||||
<<" received on index "<<rx.dstif;
|
||||
@@ -143,14 +157,14 @@ void test_local_mcast()
|
||||
"Recv'd %d [%u, %u, %u, %u]", ret, rxbuf[0], rxbuf[1], rxbuf[2], rxbuf[3]);
|
||||
|
||||
testEq(src, sender_addr);
|
||||
testEq(dest, mcast_addr);
|
||||
testEq(dest, mcast_addr.addr);
|
||||
}
|
||||
|
||||
void test_mcast_scope()
|
||||
{
|
||||
testDiag("Enter %s", __func__);
|
||||
|
||||
SockAddr mcast_addr(AF_INET, "224.0.0.128");
|
||||
SockEndpoint mcast_addr("224.0.0.128,1@127.0.0.1");
|
||||
auto any(SockAddr::any(AF_INET));
|
||||
auto lo(SockAddr::loopback(AF_INET));
|
||||
auto sender(SockAddr::loopback(AF_INET));
|
||||
@@ -167,9 +181,7 @@ void test_mcast_scope()
|
||||
epicsSocketEnableAddressUseForDatagramFanout(RX4.sock);
|
||||
|
||||
TX.mcast_loop(true);
|
||||
TX.mcast_ttl(1u);
|
||||
// endure message goes out through LO
|
||||
TX.mcast_iface(lo);
|
||||
TX.mcast_prep_sendto(mcast_addr);
|
||||
TX.bind(sender);
|
||||
testShow()<<" sender bound to "<<sender;
|
||||
|
||||
@@ -177,7 +189,7 @@ void test_mcast_scope()
|
||||
// by winsock bind() documentation
|
||||
|
||||
RX1.bind(any);
|
||||
mcast_addr.setPort(any.port()); // bind all RX* to the same port
|
||||
mcast_addr.addr.setPort(any.port()); // bind all RX* to the same port
|
||||
lo.setPort(any.port());
|
||||
testShow()<<" RX1 bound to "<<any;
|
||||
RX2.bind(any);
|
||||
@@ -186,17 +198,17 @@ void test_mcast_scope()
|
||||
testShow()<<" RX3 bound to "<<lo;
|
||||
#ifndef _WIN32
|
||||
// winsock doesn't allow binding to an mcast address
|
||||
RX4.bind(mcast_addr);
|
||||
RX4.bind(mcast_addr.addr);
|
||||
testShow()<<" RX4 bound to "<<mcast_addr;
|
||||
#endif
|
||||
|
||||
testShow()<<" Join RX1 to "<<mcast_addr<<" on "<<lo;
|
||||
RX1.mcast_join(mcast_addr, lo);
|
||||
RX1.mcast_join(mcast_addr.resolve());
|
||||
|
||||
const char msg[] = "hello world!";
|
||||
auto msglen = sizeof(msg)-1u;
|
||||
|
||||
auto ret = sendto(TX.sock, msg, msglen, 0, &mcast_addr->sa, lo.size());
|
||||
auto ret = sendto(TX.sock, msg, msglen, 0, &mcast_addr.addr->sa, mcast_addr.addr.size());
|
||||
testEq(ret, int(msglen))<<" sendto("<<sender<<" -> "<<mcast_addr<<") err="<<EVUTIL_SOCKET_ERROR();
|
||||
|
||||
auto doRX = [&lo, &msg, msglen](unsigned idx, evsocket& sock, bool expectrx) {
|
||||
@@ -342,10 +354,15 @@ MAIN(testsock)
|
||||
{
|
||||
SockAttach attach;
|
||||
logger_config_env();
|
||||
testPlan(51);
|
||||
testPlan(58);
|
||||
testSetup();
|
||||
test_ifacemap();
|
||||
test_udp();
|
||||
test_udp(AF_INET);
|
||||
try{
|
||||
test_udp(AF_INET6);
|
||||
}catch(std::exception&e){
|
||||
testAbort("test_udp6: %s", e.what());
|
||||
}
|
||||
test_local_mcast();
|
||||
test_mcast_scope();
|
||||
test_from_wire();
|
||||
|
||||
Reference in New Issue
Block a user