Portable capture of destination interface and IP address

Use recvmsg() and control messages IP_PKTINFO (Linux, OSX, Windows)
or IP_ORIGDSTADDR and IP_RECVIF (RTEMS w/ libbsd) to find the
index of the logical interface through which a UDP packet was received,
and the destination address from the IPv4 header.

Also, clear IP_MULTICAST_ALL on Linux to disable non-compliant legacy brokenness
and receive only those multicasts we are interested in.
This commit is contained in:
Michael Davidsaver
2021-07-26 10:13:40 -07:00
parent fe6974025a
commit f67f27e96b
12 changed files with 723 additions and 142 deletions
+5
View File
@@ -546,6 +546,11 @@ def define_DSOS(self):
src_pvxs = [os.path.join('src', src) for src in src_pvxs]
if OS_CLASS=='WIN32':
src_pvxs += ['src/os/WIN32/osdSockExt.cpp']
else:
src_pvxs += ['src/os/default/osdSockExt.cpp']
event_libs = []
if OS_CLASS=='WIN32':
event_libs = ['ws2_32','shell32','advapi32','bcrypt','iphlpapi']
+2
View File
@@ -86,6 +86,8 @@ LIB_SRCS += nt.cpp
LIB_SRCS += evhelper.cpp
LIB_SRCS += udp_collector.cpp
LIB_SRCS += osdSockExt.cpp
LIB_SRCS += config.cpp
LIB_SRCS += conn.cpp
+5 -6
View File
@@ -750,14 +750,13 @@ bool ContextImpl::onSearch()
{
searchMsg.resize(0x10000);
SockAddr src;
uint32_t ndrop = 0u;
osiSocklen_t alen = src.size();
const int nrx = recvfromx(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src->sa, &alen, &ndrop);
recvfromx rx{searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src};
const int nrx = rx.call();
if(nrx>=0 && ndrop!=0 && prevndrop!=ndrop) {
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(ndrop));
prevndrop = ndrop;
if(nrx>=0 && rx.ndrop!=0 && prevndrop!=rx.ndrop) {
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop));
prevndrop = rx.ndrop;
}
if(nrx<0) {
+66 -3
View File
@@ -20,7 +20,6 @@
#include <event2/thread.h>
#include <errlog.h>
#include <osiSock.h>
#include <epicsEvent.h>
#include <epicsThread.h>
#include <epicsExit.h>
@@ -44,6 +43,7 @@ namespace pvxs {namespace impl {
DEFINE_LOGGER(logerr, "pvxs.loop");
DEFINE_LOGGER(logtimer, "pvxs.timer");
DEFINE_LOGGER(logiface, "pvxs.iface");
namespace mdetail {
VFunctor0::~VFunctor0() {}
@@ -367,8 +367,16 @@ bool evbase::assertInRunningLoop() const
evsocket::evsocket(evutil_socket_t sock)
:sock(sock)
{
if(sock==evutil_socket_t(-1))
throw std::bad_alloc();
if(sock==evutil_socket_t(-1)) {
int err = SOCKERRNO;
#ifdef _WIN32
if(err==WSANOTINITIALISED) {
throw std::runtime_error("WSANOTINITIALISED");
}
#endif
(void)err;
throw std::runtime_error("Unable to allocate socket");
}
evutil_make_socket_closeonexec(sock);
@@ -386,6 +394,19 @@ evsocket::evsocket(evutil_socket_t sock)
evsocket::evsocket(int af, int type, int proto)
:evsocket(socket(af, type | SOCK_CLOEXEC, proto))
{
#ifdef __linux__
# ifndef IP_MULTICAST_ALL
# define IP_MULTICAST_ALL 49
# endif
// Disable non-compliant legacy behavior of Linux IP stack
if(af==AF_INET && type==SOCK_DGRAM){
int val = 0;
if(setsockopt(sock, IPPROTO_IP, IP_MULTICAST_ALL, (char*)&val, sizeof(val))) {
log_warn_printf(logerr, "Unable to clear IP_MULTICAST_ALL (err=%d). This may cause problems on multi-homed hosts.\n",
evutil_socket_geterror(sock));
}
}
#endif
}
evsocket::evsocket(evsocket&& o) noexcept
@@ -511,6 +532,48 @@ std::vector<SockAddr> evsocket::broadcasts(const SockAddr* match) const
return ret;
}
#if EPICS_VERSION_INT<VERSION_INT(7,0,3,1)
# define getMonotonic getCurrent
#endif
IfaceMap::IfaceMap()
{
refresh();
updated = epicsTime::getMonotonic();
}
bool IfaceMap::has_address(int64_t ifindex, const SockAddr &addr)
{
if(addr.isAny())
return true;
auto now(epicsTime::getMonotonic());
auto age = now-updated;
bool first = true;
retry:
if(!first || age > 60) {
refresh();
updated = now;
} else {
log_debug_printf(logiface, "using cache age %.2f sec\n", age);
}
auto ifit(info.find(ifindex));
if(ifit!=info.end()) {
const auto& iface = ifit->second;
auto adit(iface.find(addr));
return adit!=iface.end();
}
if(first) {
// re-try once
first = false;
goto retry;
}
log_warn_printf(logiface, "Encountered unknown interface index %lld\n", (long long)ifindex);
return false;
}
void to_wire(Buffer& buf, const SockAddr& val)
{
if(!buf.ensure(16)) {
+16
View File
@@ -11,6 +11,8 @@
#include <functional>
#include <memory>
#include <string>
#include <map>
#include <set>
#include <event2/event.h>
#include <event2/buffer.h>
@@ -20,6 +22,8 @@
#include <pvxs/version.h>
#include <utilpvt.h>
#include <epicsTime.h>
#include "pvaproto.h"
// hooks for std::unique_ptr
@@ -230,6 +234,18 @@ struct PVXS_API evsocket
std::vector<SockAddr> broadcasts(const SockAddr* match=nullptr) const;
};
struct PVXS_API IfaceMap {
IfaceMap();
// return true if ifindex is valid, and addr is one of the addresses currently assigned to it.
bool has_address(int64_t ifindex, const SockAddr& addr);
void refresh();
std::map<int64_t, std::set<SockAddr, SockAddrOnlyLess>> info;
epicsTime updated;
};
} // namespace impl
+159
View File
@@ -0,0 +1,159 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include <winsock2.h>
#include <iphlpapi.h>
#include "osiSockExt.h"
#include <mswsock.h>
#include <vector>
#include <pvxs/log.h>
#include "evhelper.h"
#include <epicsThread.h>
#include <cantProceed.h>
namespace pvxs {
DEFINE_LOGGER(log, "pvxs.util");
DEFINE_LOGGER(logiface, "pvxs.iface");
static
LPFN_WSARECVMSG WSARecvMsg;
static
epicsThreadOnceId oseOnce = EPICS_THREAD_ONCE_INIT;
static
void oseDoOnce(void*)
{
evsocket dummy(AF_INET, SOCK_DGRAM, 0);
GUID guid = WSAID_WSARECVMSG;
DWORD nout;
if(WSAIoctl(dummy.sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid),
&WSARecvMsg, sizeof(WSARecvMsg),
&nout, nullptr, nullptr))
{
cantProceed("Unable to get &WSARecvMsg: %d", WSAGetLastError());
}
if(!WSARecvMsg)
cantProceed("Unable to get &WSARecvMsg!!");
}
void osiSockAttachExt()
{
osiSockAttach();
epicsThreadOnce(&oseOnce, &oseDoOnce, nullptr);
}
void enable_SO_RXQ_OVFL(SOCKET sock) {}
void enable_IP_PKTINFO(SOCKET sock)
{
int val = 1;
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
}
int recvfromx::call()
{
ndrop = 0u;
dstif = -1;
WSAMSG msg{};
WSABUF iov = {(ULONG)buflen, (char*)buf};
msg.lpBuffers = &iov;
msg.dwBufferCount = 1u;
msg.name = &(*src)->sa;
msg.namelen = src->size();
alignas (alignof (WSACMSGHDR)) char cbuf[WSA_CMSG_SPACE(sizeof(in_pktinfo))];
msg.Control = {sizeof(cbuf), cbuf};
DWORD nrx=0u;
if(!WSARecvMsg(sock, &msg, &nrx, nullptr, nullptr)) {
if(msg.dwFlags & MSG_CTRUNC)
log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.Control.len, sizeof(cbuf));
for(WSACMSGHDR *hdr = WSA_CMSG_FIRSTHDR(&msg); hdr ; hdr = WSA_CMSG_NXTHDR(&msg, hdr)) {
if(hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_PKTINFO && hdr->cmsg_len>=WSA_CMSG_LEN(sizeof(in_pktinfo))) {
if(dst) {
(*dst)->in.sin_family = AF_INET;
memcpy(&(*dst)->in.sin_addr, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_addr), sizeof(IN_ADDR));
}
decltype(in_pktinfo::ipi_ifindex) idx;
memcpy(&idx, WSA_CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx));
dstif = idx;
}
}
return nrx;
} else {
return -1;
}
}
namespace impl {
#ifndef GAA_FLAG_INCLUDE_ALL_INTERFACES
# define GAA_FLAG_INCLUDE_ALL_INTERFACES 0
#endif
void IfaceMap::refresh() {
std::vector<char> ifaces(1024u);
decltype (info) temp;
{
constexpr ULONG flags = GAA_FLAG_SKIP_ANYCAST|GAA_FLAG_SKIP_MULTICAST|GAA_FLAG_SKIP_DNS_SERVER|GAA_FLAG_INCLUDE_ALL_INTERFACES;
ULONG buflen = ifaces.size();
auto err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast<IP_ADAPTER_ADDRESSES*>(ifaces.data()), &buflen);
if(err == ERROR_BUFFER_OVERFLOW) {
// buflen updated with necessary length, retry
ifaces.resize(buflen);
err = GetAdaptersAddresses(AF_INET, flags, 0, reinterpret_cast<IP_ADAPTER_ADDRESSES*>(ifaces.data()), &buflen);
}
if(err) {
log_warn_printf(logiface, "Unable to GetAdaptersAddresses() error=%lld\n", (unsigned long long)err);
return;
}
}
for(auto iface = reinterpret_cast<const IP_ADAPTER_ADDRESSES*>(ifaces.data()); iface ; iface = iface->Next) {
auto& info = temp[iface->IfIndex];
//TODO: any flags to check?
for(auto addr = iface->FirstUnicastAddress; addr; addr = addr->Next) {
if(addr->Address.lpSockaddr->sa_family!=AF_INET)
continue;
auto pair = info.emplace(addr->Address.lpSockaddr, sizeof(sockaddr_in));
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n",
(long long)iface->IfIndex, iface->AdapterName, pair.first->tostring().c_str());
}
}
info.swap(temp);
}
} // namespace impl
} // namespace pvxs
+196
View File
@@ -0,0 +1,196 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#include "osiSockExt.h"
#include <string.h>
#include <sys/types.h>
#include <net/if.h>
#include <ifaddrs.h>
#ifdef __rtems__
// missing extern C circa RTEMS 5.1
extern "C" {
# include <net/if_dl.h>
}
#endif
#include <pvxs/log.h>
#include <evhelper.h>
namespace pvxs {
DEFINE_LOGGER(log, "pvxs.util");
DEFINE_LOGGER(logiface, "pvxs.iface");
void osiSockAttachExt() {}
void enable_SO_RXQ_OVFL(SOCKET sock)
{
#ifdef SO_RXQ_OVFL
// Linux specific feature exposes OS dropped packet count
int val = 1;
if(setsockopt(sock, SOL_SOCKET, SO_RXQ_OVFL, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set SO_RXQ_OVFL: %d\n", SOCKERRNO);
#endif
}
void enable_IP_PKTINFO(SOCKET sock)
{
/* linux, some *BSD's (OSX), and winsock package both destination address (from ip header)
* and receiving interface index (from host) into one IP_PKTINFO control message.
* Remaining *BSD's can deliver these in separate IP_ORIGDSTADDR and IP_RECVIF messages.
*/
#ifdef IP_PKTINFO
int val = 1;
if(setsockopt(sock, IPPROTO_IP, IP_PKTINFO, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set IP_PKTINFO: %d\n", SOCKERRNO);
#else
# ifdef IP_ORIGDSTADDR
{
int val = 1;
if(setsockopt(sock, IPPROTO_IP, IP_ORIGDSTADDR, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set IP_ORIGDSTADDR: %d\n", SOCKERRNO);
}
# endif
# ifdef IP_RECVIF
{
int val = 1;
if(setsockopt(sock, IPPROTO_IP, IP_RECVIF, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set IP_RECVIF: %d\n", SOCKERRNO);
}
# endif
#endif
}
int recvfromx::call()
{
msghdr msg{};
iovec iov = {buf, buflen};
msg.msg_iov = &iov;
msg.msg_iovlen = 1u;
msg.msg_name = &(*src)->sa;
msg.msg_namelen = src ? src->size() : 0u;
alignas (alignof (cmsghdr)) char cbuf[0u
#ifdef SO_RXQ_OVFL
+ CMSG_SPACE(sizeof(ndrop))
#endif
#ifdef IP_PKTINFO
+ CMSG_SPACE(sizeof(in_pktinfo))
#else
# if defined(IP_ORIGDSTADDR)
+ CMSG_SPACE(sizeof(sockaddr_in))
# endif
# if defined(IP_RECVIF)
+ CMSG_SPACE(sizeof(sockaddr_dl))
# endif
#endif
];
msg.msg_control = cbuf;
msg.msg_controllen = sizeof(cbuf);
if(dst)
*dst = SockAddr();
dstif = -1;
ndrop = 0u;
int ret = recvmsg(sock, &msg, 0);
if(ret>=0) { // on success, check for control messages
if(msg.msg_flags & MSG_CTRUNC)
log_warn_printf(log, "MSG_CTRUNC, expand buffer %zu <- %zu\n", msg.msg_controllen, sizeof(cbuf));
for(cmsghdr *hdr = CMSG_FIRSTHDR(&msg); hdr ; hdr = CMSG_NXTHDR(&msg, hdr)) {
if(0) {}
#ifdef SO_RXQ_OVFL
else if(hdr->cmsg_level==SOL_SOCKET && hdr->cmsg_type==SO_RXQ_OVFL && hdr->cmsg_len>=CMSG_LEN(sizeof(ndrop))) {
memcpy(&ndrop, CMSG_DATA(hdr), sizeof(ndrop));
}
#endif
#ifdef IP_PKTINFO
else if(hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_PKTINFO && hdr->cmsg_len>=CMSG_LEN(sizeof(in_pktinfo))) {
if(dst) {
(*dst)->in.sin_family = AF_INET;
memcpy(&(*dst)->in.sin_addr, CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_addr), sizeof(in_addr_t));
}
decltype(in_pktinfo::ipi_ifindex) idx;
memcpy(&idx, CMSG_DATA(hdr) + offsetof(in_pktinfo, ipi_ifindex), sizeof(idx));
dstif = idx;
}
#else
# ifdef IP_ORIGDSTADDR
else if(dst && hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_ORIGDSTADDR && hdr->cmsg_len>=CMSG_LEN(sizeof(sockaddr_in))) {
memcpy(&(*dst)->in, CMSG_DATA(hdr), sizeof(sockaddr_in));
}
# endif
# ifdef IP_RECVIF
else if(dst && hdr->cmsg_level==IPPROTO_IP && hdr->cmsg_type==IP_RECVIF && hdr->cmsg_len>=CMSG_LEN(sizeof(sockaddr_dl))) {
decltype (sockaddr_dl::sdl_index) idx;
memcpy(&idx, CMSG_DATA(hdr) + offsetof(sockaddr_dl, sdl_index), sizeof(idx));
dstif = idx;
}
# endif
#endif
}
}
return ret;
}
namespace impl {
void IfaceMap::refresh() {
ifaddrs* addrs = nullptr;
decltype (info) temp;
if(getifaddrs(&addrs)) {
log_warn_printf(logiface, "Unable to getifaddrs() errno=%d\n", errno);
return;
}
try {
for(const ifaddrs* ifa = addrs; ifa; ifa = ifa->ifa_next) {
if(ifa->ifa_addr->sa_family!=AF_INET) {
log_debug_printf(logiface, "Ignoring interface '%s' address !ipv4\n", ifa->ifa_name);
continue;
}
auto idx(if_nametoindex(ifa->ifa_name));
if(idx<=0) {
log_warn_printf(logiface, "Unable to find index of interface '%s'\n", ifa->ifa_name);
continue;
}
//TODO: any flags to check?
auto pair = temp[idx].emplace(ifa->ifa_addr, sizeof(sockaddr_in));
log_debug_printf(logiface, "Found interface %lld \"%s\" w/ %s\n",
(long long)idx, ifa->ifa_name, pair.first->tostring().c_str());
}
} catch(...){
freeifaddrs(addrs);
throw;
}
freeifaddrs(addrs);
info.swap(temp);
}
} // namespace impl
} // namespace pvxs
+116
View File
@@ -0,0 +1,116 @@
/**
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*/
#ifndef OSISOCKEXT_H
#define OSISOCKEXT_H
#include <osiSock.h>
#include <string>
#include <event2/util.h>
#include <pvxs/version.h>
namespace pvxs {
PVXS_API
void osiSockAttachExt();
struct SockAttach {
SockAttach() { osiSockAttachExt(); }
~SockAttach() { osiSockRelease(); }
};
//! representation of a network address
struct PVXS_API SockAddr {
union store_t {
sockaddr sa;
sockaddr_in in;
#ifdef AF_INET6
sockaddr_in6 in6;
#endif
};
private:
store_t store;
public:
explicit SockAddr(int af = AF_UNSPEC);
explicit SockAddr(int af, const char *address, unsigned short port=0);
explicit SockAddr(const sockaddr *addr, ev_socklen_t len);
inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {}
size_t size() const;
inline unsigned short family() const { return store.sa.sa_family; }
unsigned short port() const;
void setPort(unsigned short port);
SockAddr withPort(unsigned short port) const {
SockAddr temp(*this);
temp.setPort(port);
return temp;
}
void setAddress(const char *, unsigned short port=0);
bool isAny() const;
bool isLO() const;
store_t* operator->() { return &store; }
const store_t* operator->() const { return &store; }
std::string tostring() const;
static SockAddr any(int af, unsigned port=0);
static SockAddr loopback(int af, unsigned port=0);
inline int compare(const SockAddr& o, bool useport=true) const {
return evutil_sockaddr_cmp(&store.sa, &o.store.sa, useport);
}
inline bool operator<(const SockAddr& o) const {
return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)<0;
}
inline bool operator==(const SockAddr& o) const {
return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)==0;
}
inline bool operator!=(const SockAddr& o) const {
return !(*this==o);
}
};
// compare address only, ignore port number
struct SockAddrOnlyLess {
bool operator()(const SockAddr& lhs, const SockAddr& rhs) const {
return lhs.compare(rhs, false)<0;
}
};
PVXS_API
std::ostream& operator<<(std::ostream& strm, const SockAddr& addr);
// Linux specific include OS dropped packet counter as cmsg
void enable_SO_RXQ_OVFL(SOCKET sock);
// Include destination address as cmsg
PVXS_API
void enable_IP_PKTINFO(SOCKET sock);
struct recvfromx {
evutil_socket_t sock;
void *buf;
size_t buflen;
SockAddr* src;
SockAddr* dst; // if enable_IP_PKTINFO()
int64_t dstif; // if enable_IP_PKTINFO(), destination interface index
uint32_t ndrop; // if enable_SO_RXQ_OVFL()
PVXS_API
int call();
};
} // namespace pvxs
#endif // OSISOCKEXT_H
+9 -7
View File
@@ -52,16 +52,16 @@ struct UDPCollector : public UDPManager::Search,
bool handle_one()
{
osiSocklen_t alen = src.size();
uint32_t ndrop = 0u;
SockAddr dest;
// For Search messages, we use PV name strings in-place by adding nils.
// Ensure one extra byte at the end of the buffer for a nil after the last PV name
const int nrx = recvfromx(sock.sock, (char*)&buf[0], buf.size()-1, &src->sa, &alen, &ndrop);
recvfromx rx{sock.sock, (char*)&buf[0], buf.size()-1, &src, &dest};
const int nrx = rx.call();
if(nrx>=0 && ndrop!=0u && prevndrop!=ndrop) {
log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(ndrop));
prevndrop = ndrop;
if(nrx>=0 && rx.ndrop!=0u && prevndrop!=rx.ndrop) {
log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop));
prevndrop = rx.ndrop;
}
if(nrx<0) {
@@ -93,7 +93,8 @@ struct UDPCollector : public UDPManager::Search,
return true;
}
log_hex_printf(logio, Level::Debug, &buf[0], nrx, "UDP Rx %d from %s\n", nrx, src.tostring().c_str());
log_hex_printf(logio, Level::Debug, &buf[0], nrx, "UDP Rx %d, %s -> %s\n",
nrx, src.tostring().c_str(), dest.tostring().c_str());
names.clear();
@@ -268,6 +269,7 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, const SockAddr& bind_addr)
epicsSocketEnableAddressUseForDatagramFanout(sock.sock);
enable_SO_RXQ_OVFL(sock.sock);
enable_IP_PKTINFO(sock.sock);
sock.bind(this->bind_addr);
name = "UDP "+this->bind_addr.tostring();
-49
View File
@@ -256,55 +256,6 @@ SigInt::~SigInt()
#endif // !defined(__rtems__) && !defined(vxWorks)
void enable_SO_RXQ_OVFL(SOCKET sock)
{
#ifdef SO_RXQ_OVFL
// Linux specific feature exposes OS dropped packet count
{
int val = 1;
if(setsockopt(sock, SOL_SOCKET, SO_RXQ_OVFL, (char*)&val, sizeof(val)))
log_warn_printf(log, "Unable to set SO_RXQ_OVFL: %d\n", SOCKERRNO);
}
#endif
}
int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop)
{
#ifdef SO_RXQ_OVFL
alignas (alignof (cmsghdr)) char cbuf[CMSG_SPACE(4u)];
iovec iov = {buf, buflen};
msghdr msg = {};
msg.msg_iov = &iov;
msg.msg_iovlen = 1u;
msg.msg_name = peer;
msg.msg_namelen = peerlen ? *peerlen : 0;
msg.msg_control = cbuf;
msg.msg_controllen = sizeof(cbuf);
int ret = recvmsg(sock, &msg, 0);
if(ret>=0) {
if(peerlen)
*peerlen = msg.msg_namelen;
if(msg.msg_flags & MSG_CTRUNC)
log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.msg_controllen, sizeof(cbuf));
if(ndrop) {
for(cmsghdr *hdr = CMSG_FIRSTHDR(&msg); hdr ; hdr = CMSG_NXTHDR(&msg, hdr)) {
if(hdr->cmsg_level==SOL_SOCKET && hdr->cmsg_type==SO_RXQ_OVFL && hdr->cmsg_len>=CMSG_LEN(4u)) {
memcpy(ndrop, CMSG_DATA(hdr), 4u);
}
}
}
}
return ret;
#else
return recvfrom(sock, (char*)buf, buflen, 0, peer, peerlen);
#endif
}
SockAddr::SockAddr(int af)
{
+1 -66
View File
@@ -6,7 +6,7 @@
#ifndef UTILPVT_H
#define UTILPVT_H
#include <osiSock.h>
#include "osiSockExt.h"
#ifdef _WIN32
# define WIN32_LEAN_AND_MEAN
@@ -196,71 +196,6 @@ using aligned_union = std::aligned_union<Len, Types...>;
} // namespace impl
using namespace impl;
struct SockAttach {
SockAttach() { osiSockAttach(); }
~SockAttach() { osiSockRelease(); }
};
// Linux specific SO_RXQ_OVFL exposes OS dropped packet counter
void enable_SO_RXQ_OVFL(SOCKET sock);
int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop);
//! representation of a network address
struct PVXS_API SockAddr {
union store_t {
sockaddr sa;
sockaddr_in in;
#ifdef AF_INET6
sockaddr_in6 in6;
#endif
};
private:
store_t store;
public:
explicit SockAddr(int af = AF_UNSPEC);
explicit SockAddr(int af, const char *address, unsigned short port=0);
explicit SockAddr(const sockaddr *addr, ev_socklen_t len);
inline explicit SockAddr(int af, const std::string& address) :SockAddr(af, address.c_str()) {}
size_t size() const;
inline unsigned short family() const { return store.sa.sa_family; }
unsigned short port() const;
void setPort(unsigned short port);
SockAddr withPort(unsigned short port) {
SockAddr temp(*this);
temp.setPort(port);
return temp;
}
void setAddress(const char *, unsigned short port=0);
bool isAny() const;
bool isLO() const;
store_t* operator->() { return &store; }
const store_t* operator->() const { return &store; }
std::string tostring() const;
static SockAddr any(int af, unsigned port=0);
static SockAddr loopback(int af, unsigned port=0);
inline bool operator<(const SockAddr& o) const {
return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)<0;
}
inline bool operator==(const SockAddr& o) const {
return evutil_sockaddr_cmp(&store.sa, &o.store.sa, true)==0;
}
inline bool operator!=(const SockAddr& o) const {
return !(*this==o);
}
};
PVXS_API
std::ostream& operator<<(std::ostream& strm, const SockAddr& addr);
inline
timeval totv(double t)
{
+148 -11
View File
@@ -4,6 +4,8 @@
* in file LICENSE that is included with this distribution.
*/
#include <osiSockExt.h>
#include <cstring>
#include <epicsUnitTest.h>
@@ -15,9 +17,43 @@
#include <pvxs/unittest.h>
#include <pvxs/log.h>
#ifdef _WIN32
# include <windows.h>
# include <psapi.h>
static
bool is_wine()
{
HMODULE nt = GetModuleHandle("ntdll.dll");
return nt && GetProcAddress(nt, "wine_get_version");
}
#endif
namespace {
using namespace pvxs;
void test_ifacemap()
{
testDiag("Enter %s", __func__);
impl::IfaceMap ifs;
ifs.refresh();
testFalse(ifs.info.empty())<<" found "<<ifs.info.size()<<" interfaces";
bool foundlo = false;
const auto lo(SockAddr::loopback(AF_INET));
for(const auto& iface : ifs.info) {
for(const auto& addr : iface.second) {
if(addr!=lo)
continue;
testTrue(!foundlo)<<" Found loopback with index "<<iface.first;
foundlo = true;
}
}
}
void test_udp()
{
testDiag("Enter %s", __func__);
@@ -27,6 +63,7 @@ void test_udp()
SockAddr bind_addr(SockAddr::loopback(AF_INET));
enable_IP_PKTINFO(A.sock);
A.bind(bind_addr);
testNotEq(bind_addr.port(), 0)<<"bound port";
@@ -42,32 +79,35 @@ void test_udp()
uint8_t rxbuf[8] = {};
SockAddr src;
SockAddr dest;
testDiag("Call recvfrom()");
socklen_t slen = src.size();
ret = recvfrom(A.sock, (char*)rxbuf, sizeof(rxbuf), 0, &src->sa, &slen);
ret = recvfromx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest}.call();
// only the destination address is captured, not the port
if(dest.family()==AF_INET)
dest.setPort(bind_addr.port());
testOk(ret==4 && rxbuf[0]==0x12 && rxbuf[1]==0x34 && rxbuf[2]==0x56 && rxbuf[3]==0x78,
"Recv'd %d(%d) [%u, %u, %u, %u]", ret, EVUTIL_SOCKET_ERROR(), rxbuf[0], rxbuf[1], rxbuf[2], rxbuf[3]);
testEq(src, send_addr);
testEq(dest, bind_addr);
}
void test_local_mcast()
{
testDiag("Enter %s", __func__);
IfaceMap ifinfo;
evsocket A(AF_INET, SOCK_DGRAM, 0),
B(AF_INET, SOCK_DGRAM, 0);
SockAddr mcast_addr(AF_INET);
mcast_addr.setAddress("224.0.0.128");
SockAddr mcast_addr(AF_INET, "224.0.0.128");
#ifdef _WIN32
// We could bind to mcast_addr on all targets except WIN32
SockAddr bind_addr(SockAddr::any(AF_INET));
#else
SockAddr bind_addr(mcast_addr);
#endif
enable_IP_PKTINFO(A.sock);
A.bind(bind_addr);
mcast_addr.setPort(bind_addr.port());
@@ -88,14 +128,108 @@ void test_local_mcast()
uint8_t rxbuf[8] = {};
SockAddr src;
SockAddr dest;
testDiag("Call recvfrom()");
socklen_t slen = src.size();
ret = recvfrom(A.sock, (char*)rxbuf, sizeof(rxbuf), 0, &src->sa, &slen);
recvfromx rx{A.sock, (char*)rxbuf, sizeof(rxbuf), &src, &dest};
ret = rx.call();
if(dest.family()==AF_INET)
dest.setPort(mcast_addr.port());
testTrue(ret>=0 && rx.dstif>0 && ifinfo.has_address(rx.dstif, sender_addr))
<<" received on index "<<rx.dstif;
testOk(ret==4 && rxbuf[0]==0x12 && rxbuf[1]==0x34 && rxbuf[2]==0x56 && rxbuf[3]==0x78,
"Recv'd %d [%u, %u, %u, %u]", ret, rxbuf[0], rxbuf[1], rxbuf[2], rxbuf[3]);
testEq(src, sender_addr);
testEq(dest, mcast_addr);
}
void test_mcast_scope()
{
testDiag("Enter %s", __func__);
SockAddr mcast_addr(AF_INET, "224.0.0.128");
auto any(SockAddr::any(AF_INET));
auto lo(SockAddr::loopback(AF_INET));
auto sender(SockAddr::loopback(AF_INET));
evsocket TX (AF_INET, SOCK_DGRAM, 0),
RX1(AF_INET, SOCK_DGRAM, 0),
RX2(AF_INET, SOCK_DGRAM, 0),
RX3(AF_INET, SOCK_DGRAM, 0),
RX4(AF_INET, SOCK_DGRAM, 0);
epicsSocketEnableAddressUseForDatagramFanout(RX1.sock);
epicsSocketEnableAddressUseForDatagramFanout(RX2.sock);
epicsSocketEnableAddressUseForDatagramFanout(RX3.sock);
epicsSocketEnableAddressUseForDatagramFanout(RX4.sock);
TX.mcast_loop(true);
TX.mcast_ttl(1u);
// endure message goes out through LO
TX.mcast_iface(lo);
TX.bind(sender);
testShow()<<" sender bound to "<<sender;
// ordering of bind() before joining mcast group is "strongly recommended"
// by winsock bind() documentation
RX1.bind(any);
mcast_addr.setPort(any.port()); // bind all RX* to the same port
lo.setPort(any.port());
testShow()<<" RX1 bound to "<<any;
RX2.bind(any);
testShow()<<" RX2 bound to "<<any;
RX3.bind(lo);
testShow()<<" RX3 bound to "<<lo;
#ifndef _WIN32
// winsock doesn't allow binding to an mcast address
RX4.bind(mcast_addr);
testShow()<<" RX4 bound to "<<mcast_addr;
#endif
testShow()<<" Join RX1 to "<<mcast_addr<<" on "<<lo;
RX1.mcast_join(mcast_addr, lo);
const char msg[] = "hello world!";
auto msglen = sizeof(msg)-1u;
auto ret = sendto(TX.sock, msg, msglen, 0, &mcast_addr->sa, lo.size());
testEq(ret, int(msglen))<<" sendto("<<sender<<" -> "<<mcast_addr<<") err="<<EVUTIL_SOCKET_ERROR();
auto doRX = [&lo, &msg, msglen](unsigned idx, evsocket& sock, bool expectrx) {
testShow()<<"RX"<<idx<<" expect "<<(expectrx ? "success" : "failure");
char buf[sizeof(msg)-1u+2u];
SockAddr src, dest;
recvfromx rx{sock.sock, buf, sizeof(buf), &src, &dest};
auto ret = rx.call();
if(expectrx) {
testEq(ret, int(msglen))<<" recvfrom() RX"<<idx<<" err="<<EVUTIL_SOCKET_ERROR()<<" src="<<src;
testTrue(lo.compare(src))<<" RX"<<idx<<" from "<<src;
testTrue(memcmp(buf, msg, msglen)==0)<<" RX"<<idx;
} else {
testTrue(ret<0)<<" RX"<<idx<<" expected error ret="<<ret<<" err="<<EVUTIL_SOCKET_ERROR();
testSkip(2, "Not relevant");
}
};
#ifdef _WIN32
doRX(1, RX1, true);
doRX(2, RX2, is_wine()); // really Linux IP stack, and we couldn't clear IP_MULTICAST_ALL
doRX(3, RX3, false);
testSkip(3, "winsock doesn't allow bind() to an mcast address");
#else
doRX(1, RX1, true);
doRX(2, RX2, false);
doRX(3, RX3, false);
doRX(4, RX4, false);
#endif
}
void test_from_wire()
@@ -207,10 +341,13 @@ void test_to_wire()
MAIN(testsock)
{
SockAttach attach;
testPlan(33);
logger_config_env();
testPlan(51);
testSetup();
test_ifacemap();
test_udp();
test_local_mcast();
test_mcast_scope();
test_from_wire();
test_to_wire();
testDiag("Done");