redo UDP handling
This commit is contained in:
@@ -244,35 +244,6 @@ void to_wire(sbuf<uint8_t>& buf, const SockAddr &val, bool be)
|
||||
buf += 16;
|
||||
}
|
||||
|
||||
void from_wire(sbuf<const uint8_t> &buf, SockAddr& val, bool be)
|
||||
{
|
||||
if(buf.err || buf.size()<16) {
|
||||
buf.err = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// win32 lacks IN6_IS_ADDR_V4MAPPED()
|
||||
bool ismapped = true;
|
||||
for(unsigned i=0u; i<10; i++)
|
||||
ismapped &= buf[i]==0;
|
||||
ismapped &= buf[10]==0xff;
|
||||
ismapped &= buf[11]==0xff;
|
||||
|
||||
if(ismapped) {
|
||||
val->in = {};
|
||||
val->in.sin_family = AF_INET;
|
||||
memcpy(&val->in.sin_addr.s_addr, buf.pos+12, 4);
|
||||
|
||||
} else {
|
||||
val->in6 = {};
|
||||
val->in6.sin6_family = AF_INET6;
|
||||
|
||||
static_assert (sizeof(val->in6.sin6_addr)==16, "");
|
||||
memcpy(&val->in6.sin6_addr, buf.pos, 16);
|
||||
}
|
||||
buf += 16;
|
||||
}
|
||||
|
||||
evsocket::evsocket(evutil_socket_t sock)
|
||||
:sock(sock)
|
||||
{
|
||||
|
||||
+37
-2
@@ -22,6 +22,14 @@
|
||||
|
||||
#include "pvaproto.h"
|
||||
|
||||
// hooks for std::unique_ptr
|
||||
namespace std {
|
||||
template<>
|
||||
struct default_delete<event> {
|
||||
inline void operator()(event* ev) { event_free(ev); }
|
||||
};
|
||||
}
|
||||
|
||||
namespace pvxsimpl {
|
||||
using namespace pvxs;
|
||||
|
||||
@@ -102,8 +110,35 @@ struct evlisten {
|
||||
PVXS_API
|
||||
void to_wire(sbuf<uint8_t>& buf, const SockAddr& val, bool be);
|
||||
|
||||
PVXS_API
|
||||
void from_wire(sbuf<const uint8_t>& buf, SockAddr& val, bool be);
|
||||
template <typename B>
|
||||
void from_wire(sbuf<B> &buf, SockAddr& val, bool be)
|
||||
{
|
||||
if(buf.err || buf.size()<16) {
|
||||
buf.err = true;
|
||||
return;
|
||||
}
|
||||
|
||||
// win32 lacks IN6_IS_ADDR_V4MAPPED()
|
||||
bool ismapped = true;
|
||||
for(unsigned i=0u; i<10; i++)
|
||||
ismapped &= buf[i]==0;
|
||||
ismapped &= buf[10]==0xff;
|
||||
ismapped &= buf[11]==0xff;
|
||||
|
||||
if(ismapped) {
|
||||
val->in = {};
|
||||
val->in.sin_family = AF_INET;
|
||||
memcpy(&val->in.sin_addr.s_addr, buf.pos+12, 4);
|
||||
|
||||
} else {
|
||||
val->in6 = {};
|
||||
val->in6.sin6_family = AF_INET6;
|
||||
|
||||
static_assert (sizeof(val->in6.sin6_addr)==16, "");
|
||||
memcpy(&val->in6.sin6_addr, buf.pos, 16);
|
||||
}
|
||||
buf += 16;
|
||||
}
|
||||
|
||||
struct PVXS_API evsocket
|
||||
{
|
||||
|
||||
+29
-6
@@ -47,8 +47,8 @@ struct sbuf {
|
||||
}
|
||||
};
|
||||
|
||||
template <unsigned N>
|
||||
inline void _from_wire(sbuf<const uint8_t>& buf, uint8_t *mem, bool reverse)
|
||||
template <unsigned N, typename B>
|
||||
inline void _from_wire(sbuf<B>& buf, uint8_t *mem, bool reverse)
|
||||
{
|
||||
if(buf.err || buf.size()<N) {
|
||||
buf.err = true;
|
||||
@@ -73,8 +73,8 @@ inline void _from_wire(sbuf<const uint8_t>& buf, uint8_t *mem, bool reverse)
|
||||
* @param val output variable
|
||||
* @param be true if value encoded in buf is in MSBF order, false if in LSBF order
|
||||
*/
|
||||
template<typename T, typename std::enable_if<std::is_scalar<T>::value, int>::type =0>
|
||||
inline void from_wire(sbuf<const uint8_t>& buf, T& val, bool be)
|
||||
template<typename T, typename B, typename std::enable_if<std::is_scalar<T>::value, int>::type =0>
|
||||
inline void from_wire(sbuf<B>& buf, T& val, bool be)
|
||||
{
|
||||
union {
|
||||
T v;
|
||||
@@ -92,8 +92,31 @@ struct Size {
|
||||
explicit Size(T& size) :size(&size) {}
|
||||
};
|
||||
|
||||
PVXS_API
|
||||
void from_wire(sbuf<const uint8_t>& buf, Size<size_t> size, bool be);
|
||||
template<typename B>
|
||||
void from_wire(sbuf<B>& buf, Size<size_t> size, bool be)
|
||||
{
|
||||
if(buf.err || buf.empty()) {
|
||||
buf.err = true;
|
||||
return;
|
||||
}
|
||||
uint8_t s=buf[0];
|
||||
buf+=1;
|
||||
if(s<254) {
|
||||
*size.size = s;
|
||||
|
||||
} else if(s==255) {
|
||||
// "null" size. not sure it is used. Replicate weirdness of pvDataCPP
|
||||
*size.size = -1;
|
||||
|
||||
} else if(s==254) {
|
||||
uint32_t ls = 0;
|
||||
from_wire(buf, ls, be);
|
||||
*size.size = ls;
|
||||
} else {
|
||||
// unreachable
|
||||
buf.err = true;
|
||||
}
|
||||
}
|
||||
|
||||
template<unsigned N>
|
||||
inline void _to_wire(sbuf<uint8_t>& buf, const uint8_t *mem, bool reverse)
|
||||
|
||||
+1
-1
@@ -278,7 +278,7 @@ void Server::Pvt::start()
|
||||
for(auto& iface : interfaces) {
|
||||
auto addr = iface.bind_addr;
|
||||
addr.setPort(effective.default_udp);
|
||||
iface.searchrx = manager.subscribe(addr, [](const UDPMsg& msg) {
|
||||
iface.searchrx = manager.onSearch(addr, [](const UDPManager::Search& msg) {
|
||||
// TODO handle search
|
||||
});
|
||||
}
|
||||
|
||||
+234
-125
@@ -31,7 +31,18 @@ namespace pvxsimpl {
|
||||
DEFINE_LOGGER(logio, "udp.io");
|
||||
DEFINE_LOGGER(logsetup, "udp.setup");
|
||||
|
||||
struct UDPCollector : public std::enable_shared_from_this<UDPCollector>
|
||||
struct UDPListener : public std::enable_shared_from_this<UDPListener>
|
||||
{
|
||||
std::function<void(UDPManager::Search&)> searchCB;
|
||||
std::function<void(UDPManager::Beacon&)> beaconCB;
|
||||
std::shared_ptr<UDPCollector> collector;
|
||||
const SockAddr dest;
|
||||
UDPListener(UDPManager::Pvt *manager, const SockAddr& dest);
|
||||
~UDPListener();
|
||||
};
|
||||
|
||||
struct UDPCollector : public UDPManager::Search,
|
||||
public std::enable_shared_from_this<UDPCollector>
|
||||
{
|
||||
const std::shared_ptr<UDPManager::Pvt> manager;
|
||||
SockAddr bind_addr;
|
||||
@@ -40,106 +51,179 @@ struct UDPCollector : public std::enable_shared_from_this<UDPCollector>
|
||||
evevent rx;
|
||||
|
||||
std::vector<uint8_t> buf;
|
||||
std::vector<sbuf<const uint8_t> > msgs;
|
||||
UDPMsg msg;
|
||||
|
||||
UDPManager::Beacon beaconMsg;
|
||||
|
||||
std::set<UDPListener*> listeners;
|
||||
|
||||
UDPCollector(const std::shared_ptr<UDPManager::Pvt>& manager, const SockAddr& bind_addr);
|
||||
~UDPCollector();
|
||||
|
||||
bool handle_one()
|
||||
{
|
||||
osiSocklen_t alen = src.size();
|
||||
|
||||
// For Search messages, we use PV name strings in-place by adding nils.
|
||||
// Ensure one extra byte at the end of the buffer for a nil after the last PV name
|
||||
const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size()-1, 0, &src->sa, &alen);
|
||||
log_printf(logio, PLVL_DEBUG, "recvfrom() -> %d\n", nrx);
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(sock.sock);
|
||||
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
||||
// nothing to do here
|
||||
} else {
|
||||
log_printf(logio, PLVL_WARN, "UDP RX Error on %s : %s\n", name.c_str(),
|
||||
evutil_socket_error_to_string(err));
|
||||
}
|
||||
return false; // wait for more I/O
|
||||
|
||||
} else if(nrx<8) {
|
||||
// maybe a zero (body) length packet?
|
||||
// maybe an OS error?
|
||||
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore runt on %s\n", name.c_str());
|
||||
return true;
|
||||
|
||||
} else if(buf[0]!=0xca || buf[1]==0 || (buf[2]&(pva_flags::Control|pva_flags::SegMask))) {
|
||||
// minimum header size is 8 bytes
|
||||
// ID byte must by 0xCA (because PVA has some paternal envy)
|
||||
// ignore incompatible version 0
|
||||
// UDP packets can't contain control messages, or use segmentation
|
||||
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n",
|
||||
unsigned(nrx), buf[0], buf[1], buf[2], buf[3],
|
||||
name.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx from %s", src.tostring().c_str());
|
||||
|
||||
names.clear();
|
||||
|
||||
sbuf<uint8_t> M(&buf[0], size_t(nrx));
|
||||
|
||||
uint8_t cmd = M[3];
|
||||
|
||||
bool be = M[2]&pva_flags::MSB;
|
||||
M += 4;
|
||||
uint32_t len=0;
|
||||
from_wire(M, len, be);
|
||||
|
||||
if(len > M.size() && !M.err) {
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n",
|
||||
unsigned(M.size()), M[0], M[1], M[2], M[3],
|
||||
name.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
switch(cmd) {
|
||||
|
||||
case pva_app_msg::Search: {
|
||||
uint32_t id;
|
||||
SockAddr replyAddr;
|
||||
|
||||
from_wire(M, id, be);
|
||||
M += 4; // flags and unused/reserved
|
||||
|
||||
from_wire(M, replyAddr, be);
|
||||
uint16_t port = 0;
|
||||
from_wire(M, port, be);
|
||||
replyAddr.setPort(port);
|
||||
|
||||
// so far, only "tcp" transport has ever been seen.
|
||||
// however, we will consider and ignore any others which might appear
|
||||
bool foundtcp = false;
|
||||
size_t nproto=0;
|
||||
from_wire(M, Size<size_t>(nproto), be);
|
||||
for(size_t i=0; i<nproto && !M.err; i++) {
|
||||
size_t nchar=0;
|
||||
from_wire(M, Size<size_t>(nchar), be);
|
||||
|
||||
if(M.size()>=3 && nchar==3 && M[0]=='t' && M[1]=='c' && M[2]=='p') {
|
||||
foundtcp = true;
|
||||
M += 3;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!foundtcp && !M.err) {
|
||||
// so far, not something which should actually happen
|
||||
log_printf(logio, PLVL_DEBUG, " Search w/o proto \"tcp\"\n");
|
||||
return true;
|
||||
}
|
||||
|
||||
// one Search message can include many PV names.
|
||||
uint16_t nchan=0;
|
||||
from_wire(M, nchan, be);
|
||||
|
||||
names.clear();
|
||||
names.reserve(nchan);
|
||||
|
||||
for(size_t i=0; i<nchan && !M.err; i++) {
|
||||
uint32_t id=0xffffffff; // poison
|
||||
size_t chlen;
|
||||
|
||||
auto mundge = M.pos;
|
||||
from_wire(M, id, be);
|
||||
from_wire(M, Size<size_t>(chlen), be);
|
||||
// inject nil for previous PV name
|
||||
*mundge = '\0';
|
||||
if(chlen<=M.size() && !M.err) {
|
||||
names.push_back(reinterpret_cast<const char*>(M.pos));
|
||||
}
|
||||
M += chlen;
|
||||
}
|
||||
|
||||
if(!M.err) {
|
||||
// ensure nil for final PV name
|
||||
*M.pos = '\0';
|
||||
|
||||
for(auto L : listeners) {
|
||||
if(L->searchCB) {
|
||||
(L->searchCB)(*this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case pva_app_msg::Beacon: {
|
||||
uint16_t port = 0;
|
||||
|
||||
_from_wire<12>(M, &beaconMsg.guid[0], false);
|
||||
M += 4; // skip flags, seq, and change count. unused
|
||||
from_wire(M, beaconMsg.server, be);
|
||||
from_wire(M, port, be);
|
||||
beaconMsg.server.setPort(port);
|
||||
|
||||
size_t protolen=0;
|
||||
from_wire(M, Size<size_t>(protolen), be);
|
||||
M += protolen; // ignore string
|
||||
|
||||
// ignore remaining "server status" blob
|
||||
|
||||
if(!M.err) {
|
||||
for(auto L : listeners) {
|
||||
if(L->beaconCB) {
|
||||
(L->beaconCB)(beaconMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
void handle(short ev)
|
||||
{
|
||||
log_printf(logio, PLVL_DEBUG, "UDP %p event %x\n", rx.ev, ev);
|
||||
if(!(ev&EV_READ))
|
||||
return;
|
||||
|
||||
for(unsigned i=0; i<4; i++)
|
||||
{
|
||||
osiSocklen_t alen = msg.src.size();
|
||||
|
||||
const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size(), 0, &msg.src->sa, &alen);
|
||||
log_printf(logio, PLVL_DEBUG, "recvfrom() -> %d\n", nrx);
|
||||
|
||||
if(nrx<0) {
|
||||
int err = evutil_socket_geterror(sock.sock);
|
||||
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
||||
// nothing to do here
|
||||
} else {
|
||||
log_printf(logio, PLVL_WARN, "UDP RX Error on %s : %s\n", name.c_str(),
|
||||
evutil_socket_error_to_string(err));
|
||||
}
|
||||
return; // wait for more I/O
|
||||
|
||||
} else if(nrx==0) {
|
||||
// maybe a zero (body) length packet?
|
||||
// maybe an OS error?
|
||||
return;
|
||||
}
|
||||
|
||||
log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx from %s", msg.src.tostring().c_str());
|
||||
|
||||
msgs.clear();
|
||||
|
||||
sbuf<const uint8_t> packet(&buf[0], size_t(nrx));
|
||||
|
||||
while(!packet.empty() && !packet.err) {
|
||||
// do validation early, before fanout.
|
||||
|
||||
// minimum header size is 8 bytes
|
||||
// ID byte must by 0xCA (because PVA has some paternal envy)
|
||||
// ignore incompatible version 0
|
||||
// UDP packets can't contain control messages, or use segmentation
|
||||
|
||||
if(packet.size()<8) {
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore runt on %s\n", name.c_str());
|
||||
return;
|
||||
|
||||
} else if(packet[0]!=0xca || packet[1]==0 || (packet[2]&(pva_flags::Control|pva_flags::SegMask))) {
|
||||
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n",
|
||||
unsigned(packet.size()), packet[0], packet[1], packet[2], packet[3],
|
||||
name.c_str());
|
||||
return; // better luck next time?
|
||||
}
|
||||
|
||||
auto save = packet;
|
||||
|
||||
bool be = packet[2]&pva_flags::MSB;
|
||||
packet += 4;
|
||||
uint32_t len=0;
|
||||
from_wire(packet, len, be);
|
||||
|
||||
if(len > packet.size() && !packet.err) {
|
||||
log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n",
|
||||
unsigned(packet.size()), packet[0], packet[1], packet[2], packet[3],
|
||||
name.c_str());
|
||||
return;
|
||||
}
|
||||
msgs.push_back(save);
|
||||
packet += len;
|
||||
}
|
||||
|
||||
if(packet.err) {
|
||||
log_printf(logio, PLVL_WARN, "UDP packet decode fails. Ignoring\n");
|
||||
|
||||
} else if(!msgs.empty()) {
|
||||
msgs.emplace_back(nullptr, 0);
|
||||
msg.msgs = &msgs[0];
|
||||
|
||||
for(auto L : listeners)
|
||||
{
|
||||
if(!L->cb)
|
||||
continue;
|
||||
try {
|
||||
(L->cb)(msg);
|
||||
}catch(std::exception& e){
|
||||
log_printf(logio, PLVL_ERR, "Error in callback: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handle up to 4 packets before going back to the reactor
|
||||
for(unsigned i=0; i<4 && handle_one(); i++) {}
|
||||
}
|
||||
static void handle_static(evutil_socket_t fd, short ev, void *raw)
|
||||
{
|
||||
@@ -150,6 +234,10 @@ struct UDPCollector : public std::enable_shared_from_this<UDPCollector>
|
||||
log_printf(logio, PLVL_CRIT, "Ignoring unhandled exception in UDPManager::handle(): %s\n", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
// Search interface
|
||||
public:
|
||||
virtual bool reply(const void *msg, size_t msglen) const override;
|
||||
};
|
||||
|
||||
|
||||
@@ -175,9 +263,11 @@ UDPCollector::UDPCollector(const std::shared_ptr<UDPManager::Pvt>& manager, cons
|
||||
,bind_addr(bind_addr)
|
||||
,sock(bind_addr.family(), SOCK_DGRAM, 0)
|
||||
,rx(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this)
|
||||
,buf(0x10000)
|
||||
,msg(this)
|
||||
,buf(0x10001)
|
||||
,beaconMsg(src)
|
||||
{
|
||||
beaconMsg.guid.resize(12);
|
||||
|
||||
epicsSocketEnableAddressUseForDatagramFanout(sock.sock);
|
||||
sock.bind(this->bind_addr);
|
||||
name = "UDP "+this->bind_addr.tostring();
|
||||
@@ -227,8 +317,8 @@ UDPManager UDPManager::instance()
|
||||
return UDPManager(ret);
|
||||
}
|
||||
|
||||
std::unique_ptr<UDPListener> UDPManager::subscribe(SockAddr& dest,
|
||||
std::function<void(const UDPMsg& msg)>&& cb)
|
||||
std::unique_ptr<UDPListener> UDPManager::onBeacon(SockAddr& dest,
|
||||
std::function<void(const Beacon&)>&& cb)
|
||||
{
|
||||
if(!pvt)
|
||||
throw std::invalid_argument("UDPManager null");
|
||||
@@ -238,38 +328,53 @@ std::unique_ptr<UDPListener> UDPManager::subscribe(SockAddr& dest,
|
||||
pvt->loop.call([this, &ret, &dest, &cb](){
|
||||
// from event loop worker
|
||||
|
||||
ret.reset(new UDPListener);
|
||||
ret->cb = std::move(cb);
|
||||
|
||||
if(dest.port()!=0) {
|
||||
auto it = pvt->collectors.find(dest);
|
||||
if(it!=pvt->collectors.end()) {
|
||||
try {
|
||||
ret->collector = it->second->shared_from_this();
|
||||
}catch(std::bad_weak_ptr&){
|
||||
// nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(!ret->collector) {
|
||||
ret->collector.reset(new UDPCollector(pvt->shared_from_this(), dest));
|
||||
}
|
||||
|
||||
ret->collector->listeners.insert(ret.get());
|
||||
|
||||
ret->dest = dest;
|
||||
ret.reset(new UDPListener(pvt.get(), dest));
|
||||
ret->beaconCB = std::move(cb);
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
UDPListener::~UDPListener()
|
||||
std::unique_ptr<UDPListener> UDPManager::onSearch(SockAddr& dest,
|
||||
std::function<void(const Search&)>&& cb)
|
||||
{
|
||||
cancel();
|
||||
if(!pvt)
|
||||
throw std::invalid_argument("UDPManager null");
|
||||
|
||||
std::unique_ptr<UDPListener> ret;
|
||||
|
||||
pvt->loop.call([this, &ret, &dest, &cb](){
|
||||
// from event loop worker
|
||||
|
||||
ret.reset(new UDPListener(pvt.get(), dest));
|
||||
ret->searchCB = std::move(cb);
|
||||
});
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void UDPListener::cancel()
|
||||
UDPListener::UDPListener(UDPManager::Pvt *manager, const SockAddr &dest)
|
||||
:dest(dest)
|
||||
{
|
||||
if(dest.port()!=0) {
|
||||
auto it = manager->collectors.find(dest);
|
||||
if(it!=manager->collectors.end()) {
|
||||
try {
|
||||
collector = it->second->shared_from_this();
|
||||
}catch(std::bad_weak_ptr&){
|
||||
// nothing to do
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(!collector) {
|
||||
collector.reset(new UDPCollector(manager->shared_from_this(), dest));
|
||||
}
|
||||
|
||||
collector->listeners.insert(this);
|
||||
}
|
||||
|
||||
UDPListener::~UDPListener()
|
||||
{
|
||||
if(!collector)
|
||||
return;
|
||||
@@ -285,19 +390,15 @@ void UDPListener::cancel()
|
||||
// UDPManager may be destroyed at this point, which joins its event loop worker
|
||||
}
|
||||
|
||||
UDPMsg::UDPMsg(UDPCollector *collector)
|
||||
:collector(collector)
|
||||
{}
|
||||
|
||||
bool UDPMsg::reply(const void *msg, size_t msglen) const
|
||||
bool UDPCollector::reply(const void *msg, size_t msglen) const
|
||||
{
|
||||
int ntx = sendto(collector->sock.sock, (char*)msg, msglen, 0, &src->sa, src.size());
|
||||
int ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size());
|
||||
if(ntx<0) {
|
||||
int err = evutil_socket_geterror(collector->sock.sock);
|
||||
int err = evutil_socket_geterror(sock.sock);
|
||||
if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) {
|
||||
// nothing to do here
|
||||
} else {
|
||||
log_printf(logio, PLVL_WARN, "UDP TX Error on %s : %s\n", collector->name.c_str(),
|
||||
log_printf(logio, PLVL_WARN, "UDP TX Error on %s : %s\n", name.c_str(),
|
||||
evutil_socket_error_to_string(err));
|
||||
}
|
||||
return false; // wait for more I/O
|
||||
@@ -305,4 +406,12 @@ bool UDPMsg::reply(const void *msg, size_t msglen) const
|
||||
return size_t(ntx)==msglen;
|
||||
}
|
||||
|
||||
UDPManager::Search::~Search() {}
|
||||
|
||||
} // namespace pvxsimpl
|
||||
|
||||
namespace std {
|
||||
void default_delete<pvxsimpl::UDPListener>::operator()(pvxsimpl::UDPListener* listener) {
|
||||
delete listener;
|
||||
};
|
||||
} // namespace std
|
||||
|
||||
+31
-48
@@ -10,44 +10,27 @@
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <vector>
|
||||
|
||||
#include <pvxs/version.h>
|
||||
#include "evhelper.h"
|
||||
|
||||
namespace pvxsimpl {
|
||||
struct UDPListener;
|
||||
} // namespace pvxsimpl
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
struct default_delete<pvxsimpl::UDPListener> {
|
||||
PVXS_API void operator()(pvxsimpl::UDPListener*);
|
||||
};
|
||||
} // namespace std
|
||||
|
||||
namespace pvxsimpl {
|
||||
|
||||
struct UDPCollector;
|
||||
struct UDPManager;
|
||||
|
||||
struct UDPMsg {
|
||||
//! peer (source) address
|
||||
SockAddr src;
|
||||
//! points to the first byte of each message in a packet, followed by an empty message
|
||||
const sbuf<const uint8_t>* msgs;
|
||||
|
||||
//! attempt to queue a reply message
|
||||
bool reply(const void *msg, size_t msglen) const;
|
||||
|
||||
private:
|
||||
UDPCollector *collector;
|
||||
friend struct UDPCollector;
|
||||
explicit UDPMsg(UDPCollector *collector);
|
||||
};
|
||||
|
||||
//! Represents a subscription to the UDPManager
|
||||
struct PVXS_API UDPListener {
|
||||
//! automatically cancel()s
|
||||
~UDPListener();
|
||||
//! Stop receiving packets. Caller blocks until any in-progress callback has returned
|
||||
void cancel();
|
||||
private:
|
||||
friend struct UDPCollector;
|
||||
friend struct UDPManager;
|
||||
SockAddr dest;
|
||||
std::shared_ptr<UDPCollector> collector;
|
||||
std::function<void(const UDPMsg& msg)> cb;
|
||||
};
|
||||
|
||||
//! Manage reception, fanout, and reply of UDP PVA on the well known port.
|
||||
struct PVXS_API UDPManager
|
||||
{
|
||||
@@ -55,25 +38,25 @@ struct PVXS_API UDPManager
|
||||
static UDPManager instance();
|
||||
~UDPManager();
|
||||
|
||||
/** Create subscription for UDP packets.
|
||||
*
|
||||
* The provided callback functor will be invoked from a shared internal worker thread.
|
||||
* The callback should not block this worker for an extended period of time.
|
||||
*
|
||||
* UDPMsg::msgs has already passed basic validation and it may be assumed that
|
||||
* for each message:
|
||||
*
|
||||
* 1. Is at least 8 bytes
|
||||
* 2. Is an application message w/o segmentation
|
||||
* 3. Payload size field is consistent with total packet length (if decoded with correct endianness)
|
||||
*
|
||||
* The provided functor will be destroyed during UDPListener::cancel() or ~UDPListener
|
||||
*
|
||||
* @param dest Address to bind this socket. Updated with actual address (cf. getsockname() ) after bind().
|
||||
* @param cb Called for each valid packet
|
||||
*/
|
||||
std::unique_ptr<UDPListener> subscribe(SockAddr& dest,
|
||||
std::function<void(const UDPMsg& msg)>&& cb);
|
||||
struct Beacon {
|
||||
SockAddr& src;
|
||||
SockAddr server;
|
||||
std::vector<uint8_t> guid;
|
||||
Beacon(SockAddr& src) :src(src) {}
|
||||
};
|
||||
std::unique_ptr<UDPListener> onBeacon(SockAddr& dest,
|
||||
std::function<void(const Beacon&)>&& cb);
|
||||
|
||||
struct PVXS_API Search {
|
||||
SockAddr src;
|
||||
SockAddr server;
|
||||
std::vector<const char*> names;
|
||||
|
||||
virtual bool reply(const void *msg, size_t msglen) const =0;
|
||||
virtual ~Search();
|
||||
};
|
||||
std::unique_ptr<UDPListener> onSearch(SockAddr& dest,
|
||||
std::function<void(const Search&)>&& cb);
|
||||
|
||||
explicit operator bool() const { return !!pvt; }
|
||||
|
||||
|
||||
+18
-141
@@ -195,154 +195,31 @@ int main(int argc, char *argv[])
|
||||
}
|
||||
}
|
||||
|
||||
auto cb = [&opts](const pva::UDPMsg& msg)
|
||||
auto searchCB = [&opts](const pva::UDPManager::Search& msg)
|
||||
{
|
||||
// later, from worker thread
|
||||
|
||||
// filter by sender
|
||||
if(!opts.peers.empty()) {
|
||||
if(msg.src.family()!=AF_INET)
|
||||
return;
|
||||
|
||||
bool match = false;
|
||||
for(auto& tup : opts.peers) {
|
||||
uint32_t addr, mask;
|
||||
std::tie(addr, mask) = tup;
|
||||
if((msg.src->in.sin_addr.s_addr&mask)==addr) {
|
||||
match = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!match)
|
||||
return;
|
||||
}
|
||||
|
||||
bool showpeer=false;
|
||||
auto lazypeer = [&showpeer, &msg]() {
|
||||
if(!showpeer)
|
||||
log_printf(out, PLVL_INFO, "From %s\n", msg.src.tostring().c_str());
|
||||
showpeer = true;
|
||||
};
|
||||
|
||||
// allow that one UDP packet may contain several PVA messages
|
||||
for(unsigned i=0; !msg.msgs[i].empty(); i++)
|
||||
{
|
||||
auto M = msg.msgs[i];
|
||||
auto be = M[2]&pva::pva_flags::MSB;
|
||||
auto cmd = M[3];
|
||||
M+=4; // skip header
|
||||
uint32_t blen;
|
||||
pva::from_wire(M, blen, be);
|
||||
|
||||
switch(cmd) {
|
||||
case pva::pva_app_msg::OriginTag:
|
||||
log_printf(out, PLVL_WARN, "Peer sends ORIGIN_TAG by unicast/broadcast.\n");
|
||||
break;
|
||||
|
||||
case pva::pva_app_msg::Search: {
|
||||
uint32_t id;
|
||||
uint8_t flags;
|
||||
pva::SockAddr replyAddr;
|
||||
|
||||
pva::from_wire(M, id, be);
|
||||
pva::from_wire(M, flags, be);
|
||||
M += 3; // unused/reserved
|
||||
|
||||
pva::from_wire(M, replyAddr, be);
|
||||
uint16_t port = 0;
|
||||
pva::from_wire(M, port, be);
|
||||
replyAddr.setPort(port);
|
||||
|
||||
// so far, only "tcp" transport has ever been seen.
|
||||
// however, we will consider and ignore any others which might appear
|
||||
bool foundtcp = false;
|
||||
size_t nproto=0;
|
||||
pva::from_wire(M, pva::Size<size_t>(nproto), be);
|
||||
for(size_t i=0; i<nproto && !M.err; i++) {
|
||||
size_t nchar=0;
|
||||
pva::from_wire(M, pva::Size<size_t>(nchar), be);
|
||||
|
||||
if(M.size()>=3 && nchar==3 && M[0]=='t' && M[1]=='c' && M[2]=='p') {
|
||||
foundtcp = true;
|
||||
M += 3;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(!foundtcp && !M.err) {
|
||||
// so far, not something which should actually happen
|
||||
log_printf(out, PLVL_DEBUG, " Search w/o proto \"tcp\"\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
// one Search message can include many PV names.
|
||||
uint16_t nchan=0;
|
||||
pva::from_wire(M, nchan, be);
|
||||
|
||||
for(size_t i=0; i<nchan && !M.err; i++) {
|
||||
uint32_t id=0xffffffff; // poison
|
||||
size_t chlen;
|
||||
|
||||
pva::from_wire(M, id, be);
|
||||
pva::from_wire(M, pva::Size<size_t>(chlen), be);
|
||||
if(opts.client && chlen<=M.size() && !M.err) {
|
||||
std::string pvname(reinterpret_cast<const char*>(M.pos), chlen);
|
||||
if(opts.pvnames.empty() || opts.pvnames.find(pvname)!=opts.pvnames.end()) {
|
||||
lazypeer();
|
||||
log_printf(out, PLVL_INFO, " Search 0x%08x '%s' (rsvp %s)\n",
|
||||
unsigned(id), pvname.c_str(), replyAddr.tostring().c_str());
|
||||
}
|
||||
}
|
||||
M += chlen;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case pva::pva_app_msg::Beacon: {
|
||||
uint8_t guid[12] = {};
|
||||
uint8_t seq =0;
|
||||
pva::SockAddr addr;
|
||||
uint16_t port = 0;
|
||||
|
||||
pva::_from_wire<sizeof(guid)>(M, guid, false);
|
||||
M += 1; // flags/qos. unused
|
||||
pva::from_wire(M, seq, be);
|
||||
M += 2; // "change" count. unused
|
||||
pva::from_wire(M, addr, be);
|
||||
pva::from_wire(M, port, be);
|
||||
addr.setPort(port);
|
||||
|
||||
size_t protolen=0;
|
||||
pva::from_wire(M, pva::Size<size_t>(protolen), be);
|
||||
M += protolen; // ignore string
|
||||
|
||||
// ignore remaining "server status" blob
|
||||
|
||||
if(opts.server && !M.err) {
|
||||
lazypeer();
|
||||
log_printf(out, PLVL_INFO, " Beacon %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s seq %u\n",
|
||||
guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11],
|
||||
addr.tostring().c_str(), seq);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
log_printf(out, PLVL_WARN, "unknown command 0x%02x\n", cmd);
|
||||
}
|
||||
|
||||
if(M.err) {
|
||||
log_printf(out, PLVL_ERR, " Error while decoding\n");
|
||||
}
|
||||
log_printf(out, PLVL_INFO, "%s Searching for:\n", msg.src.tostring().c_str());
|
||||
for(const auto pv : msg.names) {
|
||||
log_printf(out, PLVL_INFO, " \"%s\"\n", pv);
|
||||
}
|
||||
};
|
||||
|
||||
std::vector<std::unique_ptr<pva::UDPListener>> listeners;
|
||||
auto beaconCB = [&opts](const pva::UDPManager::Beacon& msg)
|
||||
{
|
||||
const auto& guid = msg.guid;
|
||||
log_printf(out, PLVL_INFO, "%s Beacon %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n",
|
||||
msg.src.tostring().c_str(),
|
||||
guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11],
|
||||
msg.server.tostring().c_str());
|
||||
|
||||
};
|
||||
|
||||
std::vector<std::tuple<std::unique_ptr<pva::UDPListener>, std::unique_ptr<pva::UDPListener>>> listeners;
|
||||
listeners.reserve(bindaddrs.size());
|
||||
|
||||
for(auto& baddr : bindaddrs) {
|
||||
listeners.push_back(pva::UDPManager::instance()
|
||||
.subscribe(baddr, cb));
|
||||
auto manager = pva::UDPManager::instance();
|
||||
listeners.emplace_back(manager.onSearch(baddr, searchCB),
|
||||
manager.onBeacon(baddr, beaconCB));
|
||||
log_printf(out, PLVL_DEBUG, "Bind: %s\n", baddr.tostring().c_str());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user