detect UDP RX buffer overflows
On Linux, use SO_RXQ_OVFL to snoop on the OS dropped packet counter for our socket buffer. Does not detect drops on ingress, or internal to, to OS IP stack.
This commit is contained in:
+8
-1
@@ -340,6 +340,7 @@ Context::Pvt::Pvt(const Config& conf)
|
||||
if(setsockopt(searchTx.sock, SOL_SOCKET, SO_BROADCAST, (char *)&val, sizeof(val)))
|
||||
log_err_printf(setup, "Unable to setup beacon sender SO_BROADCAST: %d\n", SOCKERRNO);
|
||||
}
|
||||
enable_SO_RXQ_OVFL(searchTx.sock);
|
||||
|
||||
for(auto& addr : effective.addressList) {
|
||||
auto isbcast = bcasts.find(addr)!=bcasts.end();
|
||||
@@ -466,9 +467,15 @@ bool Context::Pvt::onSearch()
|
||||
{
|
||||
searchMsg.resize(0x10000);
|
||||
SockAddr src;
|
||||
uint32_t ndrop = 0u;
|
||||
|
||||
osiSocklen_t alen = src.size();
|
||||
const int nrx = recvfrom(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, 0, &src->sa, &alen);
|
||||
const int nrx = recvfromx(searchTx.sock, (char*)&searchMsg[0], searchMsg.size()-1, &src->sa, &alen, &ndrop);
|
||||
|
||||
if(nrx>=0 && ndrop!=0 && prevndrop!=ndrop) {
|
||||
log_debug_printf(io, "UDP search reply buffer overflow %u -> %u\n", unsigned(prevndrop), unsigned(ndrop));
|
||||
prevndrop = ndrop;
|
||||
}
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(searchTx.sock);
|
||||
|
||||
@@ -181,6 +181,7 @@ struct Context::Pvt
|
||||
const Value caMethod;
|
||||
|
||||
uint32_t nextCID=0x12345678;
|
||||
uint32_t prevndrop = 0u;
|
||||
|
||||
evsocket searchTx;
|
||||
uint16_t searchRxPort;
|
||||
|
||||
@@ -39,6 +39,7 @@ struct UDPCollector : public UDPManager::Search,
|
||||
std::string name;
|
||||
evsocket sock;
|
||||
evevent rx;
|
||||
uint32_t prevndrop;
|
||||
|
||||
std::vector<uint8_t> buf;
|
||||
|
||||
@@ -52,10 +53,16 @@ struct UDPCollector : public UDPManager::Search,
|
||||
bool handle_one()
|
||||
{
|
||||
osiSocklen_t alen = src.size();
|
||||
uint32_t ndrop = 0u;
|
||||
|
||||
// For Search messages, we use PV name strings in-place by adding nils.
|
||||
// Ensure one extra byte at the end of the buffer for a nil after the last PV name
|
||||
const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size()-1, 0, &src->sa, &alen);
|
||||
const int nrx = recvfromx(sock.sock, (char*)&buf[0], buf.size()-1, &src->sa, &alen, &ndrop);
|
||||
|
||||
if(nrx>=0 && ndrop!=0u && prevndrop!=ndrop) {
|
||||
log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(ndrop));
|
||||
prevndrop = ndrop;
|
||||
}
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(sock.sock);
|
||||
@@ -261,6 +268,7 @@ UDPCollector::UDPCollector(const std::shared_ptr<UDPManager::Pvt>& manager, cons
|
||||
manager->loop.assertInLoop();
|
||||
|
||||
epicsSocketEnableAddressUseForDatagramFanout(sock.sock);
|
||||
enable_SO_RXQ_OVFL(sock.sock);
|
||||
sock.bind(this->bind_addr);
|
||||
name = "UDP "+this->bind_addr.tostring();
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
|
||||
#include <ctype.h>
|
||||
|
||||
#include <pvxs/log.h>
|
||||
#include <pvxs/util.h>
|
||||
#include <pvxs/sharedArray.h>
|
||||
#include <pvxs/data.h>
|
||||
@@ -25,6 +26,8 @@
|
||||
|
||||
namespace pvxs {
|
||||
|
||||
DEFINE_LOGGER(log, "pvxs.util");
|
||||
|
||||
#define stringifyX(X) #X
|
||||
#define stringify(X) stringifyX(X)
|
||||
|
||||
@@ -286,6 +289,54 @@ SigInt::~SigInt()
|
||||
|
||||
#endif // !defined(__rtems__) && !defined(vxWorks)
|
||||
|
||||
void enable_SO_RXQ_OVFL(SOCKET sock)
|
||||
{
|
||||
#ifdef SO_RXQ_OVFL
|
||||
// Linux specific feature exposes OS dropped packet count
|
||||
{
|
||||
int val = 1;
|
||||
if(setsockopt(sock, SOL_SOCKET, SO_RXQ_OVFL, (char*)&val, sizeof(val)))
|
||||
log_warn_printf(log, "Unable to set SO_RXQ_OVFL: %d\n", SOCKERRNO);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop)
|
||||
{
|
||||
#ifdef SO_RXQ_OVFL
|
||||
alignas (alignof (msghdr)) char cbuf[CMSG_SPACE(4u)];
|
||||
iovec iov = {buf, buflen};
|
||||
msghdr msg = {};
|
||||
msg.msg_iov = &iov;
|
||||
msg.msg_iovlen = 1u;
|
||||
msg.msg_name = peer;
|
||||
msg.msg_namelen = peerlen ? *peerlen : 0;
|
||||
msg.msg_control = cbuf;
|
||||
msg.msg_controllen = sizeof(cbuf);
|
||||
|
||||
int ret = recvmsg(sock, &msg, 0);
|
||||
|
||||
if(ret>=0 && peerlen)
|
||||
*peerlen = msg.msg_namelen;
|
||||
|
||||
if(msg.msg_flags & MSG_CTRUNC)
|
||||
log_debug_printf(log, "MSG_CTRUNC %zu, %zu\n", msg.msg_controllen, sizeof(cbuf));
|
||||
|
||||
if(ndrop) {
|
||||
for(cmsghdr *hdr = CMSG_FIRSTHDR(&msg); hdr ; hdr = CMSG_NXTHDR(&msg, hdr)) {
|
||||
if(hdr->cmsg_level==SOL_SOCKET && hdr->cmsg_type==SO_RXQ_OVFL && hdr->cmsg_len>=CMSG_LEN(4u)) {
|
||||
memcpy(ndrop, CMSG_DATA(hdr), 4u);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
#else
|
||||
return recvfrom(sock, (char*)buf, buflen, 0, peer, peerlen);
|
||||
#endif
|
||||
}
|
||||
|
||||
SockAddr::SockAddr(int af)
|
||||
{
|
||||
memset(&store, 0, sizeof(store));
|
||||
|
||||
@@ -196,6 +196,10 @@ struct SockAttach {
|
||||
~SockAttach() { osiSockRelease(); }
|
||||
};
|
||||
|
||||
// Linux specific SO_RXQ_OVFL exposes OS dropped packet counter
|
||||
void enable_SO_RXQ_OVFL(SOCKET sock);
|
||||
int recvfromx(SOCKET sock, void *buf, size_t buflen, sockaddr* peer, osiSocklen_t* peerlen, uint32_t *ndrop);
|
||||
|
||||
//! representation of a network address
|
||||
struct PVXS_API SockAddr {
|
||||
union store_t {
|
||||
|
||||
Reference in New Issue
Block a user