diff --git a/src/evhelper.cpp b/src/evhelper.cpp index f4fe90b..4abf875 100644 --- a/src/evhelper.cpp +++ b/src/evhelper.cpp @@ -244,35 +244,6 @@ void to_wire(sbuf& buf, const SockAddr &val, bool be) buf += 16; } -void from_wire(sbuf &buf, SockAddr& val, bool be) -{ - if(buf.err || buf.size()<16) { - buf.err = true; - return; - } - - // win32 lacks IN6_IS_ADDR_V4MAPPED() - bool ismapped = true; - for(unsigned i=0u; i<10; i++) - ismapped &= buf[i]==0; - ismapped &= buf[10]==0xff; - ismapped &= buf[11]==0xff; - - if(ismapped) { - val->in = {}; - val->in.sin_family = AF_INET; - memcpy(&val->in.sin_addr.s_addr, buf.pos+12, 4); - - } else { - val->in6 = {}; - val->in6.sin6_family = AF_INET6; - - static_assert (sizeof(val->in6.sin6_addr)==16, ""); - memcpy(&val->in6.sin6_addr, buf.pos, 16); - } - buf += 16; -} - evsocket::evsocket(evutil_socket_t sock) :sock(sock) { diff --git a/src/evhelper.h b/src/evhelper.h index 5458f2e..38cf0cf 100644 --- a/src/evhelper.h +++ b/src/evhelper.h @@ -22,6 +22,14 @@ #include "pvaproto.h" +// hooks for std::unique_ptr +namespace std { +template<> +struct default_delete { + inline void operator()(event* ev) { event_free(ev); } +}; +} + namespace pvxsimpl { using namespace pvxs; @@ -102,8 +110,35 @@ struct evlisten { PVXS_API void to_wire(sbuf& buf, const SockAddr& val, bool be); -PVXS_API -void from_wire(sbuf& buf, SockAddr& val, bool be); +template +void from_wire(sbuf &buf, SockAddr& val, bool be) +{ + if(buf.err || buf.size()<16) { + buf.err = true; + return; + } + + // win32 lacks IN6_IS_ADDR_V4MAPPED() + bool ismapped = true; + for(unsigned i=0u; i<10; i++) + ismapped &= buf[i]==0; + ismapped &= buf[10]==0xff; + ismapped &= buf[11]==0xff; + + if(ismapped) { + val->in = {}; + val->in.sin_family = AF_INET; + memcpy(&val->in.sin_addr.s_addr, buf.pos+12, 4); + + } else { + val->in6 = {}; + val->in6.sin6_family = AF_INET6; + + static_assert (sizeof(val->in6.sin6_addr)==16, ""); + memcpy(&val->in6.sin6_addr, buf.pos, 16); + } + buf += 16; +} struct PVXS_API evsocket { diff --git a/src/pvaproto.h b/src/pvaproto.h index eb414b7..46b510f 100644 --- a/src/pvaproto.h +++ b/src/pvaproto.h @@ -47,8 +47,8 @@ struct sbuf { } }; -template -inline void _from_wire(sbuf& buf, uint8_t *mem, bool reverse) +template +inline void _from_wire(sbuf& buf, uint8_t *mem, bool reverse) { if(buf.err || buf.size()& buf, uint8_t *mem, bool reverse) * @param val output variable * @param be true if value encoded in buf is in MSBF order, false if in LSBF order */ -template::value, int>::type =0> -inline void from_wire(sbuf& buf, T& val, bool be) +template::value, int>::type =0> +inline void from_wire(sbuf& buf, T& val, bool be) { union { T v; @@ -92,8 +92,31 @@ struct Size { explicit Size(T& size) :size(&size) {} }; -PVXS_API -void from_wire(sbuf& buf, Size size, bool be); +template +void from_wire(sbuf& buf, Size size, bool be) +{ + if(buf.err || buf.empty()) { + buf.err = true; + return; + } + uint8_t s=buf[0]; + buf+=1; + if(s<254) { + *size.size = s; + + } else if(s==255) { + // "null" size. not sure it is used. Replicate weirdness of pvDataCPP + *size.size = -1; + + } else if(s==254) { + uint32_t ls = 0; + from_wire(buf, ls, be); + *size.size = ls; + } else { + // unreachable + buf.err = true; + } +} template inline void _to_wire(sbuf& buf, const uint8_t *mem, bool reverse) diff --git a/src/server.cpp b/src/server.cpp index 7137706..0af1115 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -278,7 +278,7 @@ void Server::Pvt::start() for(auto& iface : interfaces) { auto addr = iface.bind_addr; addr.setPort(effective.default_udp); - iface.searchrx = manager.subscribe(addr, [](const UDPMsg& msg) { + iface.searchrx = manager.onSearch(addr, [](const UDPManager::Search& msg) { // TODO handle search }); } diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index 5ce2dbf..de99e81 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -31,7 +31,18 @@ namespace pvxsimpl { DEFINE_LOGGER(logio, "udp.io"); DEFINE_LOGGER(logsetup, "udp.setup"); -struct UDPCollector : public std::enable_shared_from_this +struct UDPListener : public std::enable_shared_from_this +{ + std::function searchCB; + std::function beaconCB; + std::shared_ptr collector; + const SockAddr dest; + UDPListener(UDPManager::Pvt *manager, const SockAddr& dest); + ~UDPListener(); +}; + +struct UDPCollector : public UDPManager::Search, + public std::enable_shared_from_this { const std::shared_ptr manager; SockAddr bind_addr; @@ -40,106 +51,179 @@ struct UDPCollector : public std::enable_shared_from_this evevent rx; std::vector buf; - std::vector > msgs; - UDPMsg msg; + + UDPManager::Beacon beaconMsg; std::set listeners; UDPCollector(const std::shared_ptr& manager, const SockAddr& bind_addr); ~UDPCollector(); + bool handle_one() + { + osiSocklen_t alen = src.size(); + + // For Search messages, we use PV name strings in-place by adding nils. + // Ensure one extra byte at the end of the buffer for a nil after the last PV name + const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size()-1, 0, &src->sa, &alen); + log_printf(logio, PLVL_DEBUG, "recvfrom() -> %d\n", nrx); + + if(nrx<0) { + int err = evutil_socket_geterror(sock.sock); + if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { + // nothing to do here + } else { + log_printf(logio, PLVL_WARN, "UDP RX Error on %s : %s\n", name.c_str(), + evutil_socket_error_to_string(err)); + } + return false; // wait for more I/O + + } else if(nrx<8) { + // maybe a zero (body) length packet? + // maybe an OS error? + + log_printf(logio, PLVL_INFO, "UDP ignore runt on %s\n", name.c_str()); + return true; + + } else if(buf[0]!=0xca || buf[1]==0 || (buf[2]&(pva_flags::Control|pva_flags::SegMask))) { + // minimum header size is 8 bytes + // ID byte must by 0xCA (because PVA has some paternal envy) + // ignore incompatible version 0 + // UDP packets can't contain control messages, or use segmentation + + log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n", + unsigned(nrx), buf[0], buf[1], buf[2], buf[3], + name.c_str()); + return true; + } + + log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx from %s", src.tostring().c_str()); + + names.clear(); + + sbuf M(&buf[0], size_t(nrx)); + + uint8_t cmd = M[3]; + + bool be = M[2]&pva_flags::MSB; + M += 4; + uint32_t len=0; + from_wire(M, len, be); + + if(len > M.size() && !M.err) { + log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n", + unsigned(M.size()), M[0], M[1], M[2], M[3], + name.c_str()); + return true; + } + + switch(cmd) { + + case pva_app_msg::Search: { + uint32_t id; + SockAddr replyAddr; + + from_wire(M, id, be); + M += 4; // flags and unused/reserved + + from_wire(M, replyAddr, be); + uint16_t port = 0; + from_wire(M, port, be); + replyAddr.setPort(port); + + // so far, only "tcp" transport has ever been seen. + // however, we will consider and ignore any others which might appear + bool foundtcp = false; + size_t nproto=0; + from_wire(M, Size(nproto), be); + for(size_t i=0; i(nchar), be); + + if(M.size()>=3 && nchar==3 && M[0]=='t' && M[1]=='c' && M[2]=='p') { + foundtcp = true; + M += 3; + break; + } + } + if(!foundtcp && !M.err) { + // so far, not something which should actually happen + log_printf(logio, PLVL_DEBUG, " Search w/o proto \"tcp\"\n"); + return true; + } + + // one Search message can include many PV names. + uint16_t nchan=0; + from_wire(M, nchan, be); + + names.clear(); + names.reserve(nchan); + + for(size_t i=0; i(chlen), be); + // inject nil for previous PV name + *mundge = '\0'; + if(chlen<=M.size() && !M.err) { + names.push_back(reinterpret_cast(M.pos)); + } + M += chlen; + } + + if(!M.err) { + // ensure nil for final PV name + *M.pos = '\0'; + + for(auto L : listeners) { + if(L->searchCB) { + (L->searchCB)(*this); + } + } + } + + break; + } + + case pva_app_msg::Beacon: { + uint16_t port = 0; + + _from_wire<12>(M, &beaconMsg.guid[0], false); + M += 4; // skip flags, seq, and change count. unused + from_wire(M, beaconMsg.server, be); + from_wire(M, port, be); + beaconMsg.server.setPort(port); + + size_t protolen=0; + from_wire(M, Size(protolen), be); + M += protolen; // ignore string + + // ignore remaining "server status" blob + + if(!M.err) { + for(auto L : listeners) { + if(L->beaconCB) { + (L->beaconCB)(beaconMsg); + } + } + } + } + break; + } + + return true; + } void handle(short ev) { log_printf(logio, PLVL_DEBUG, "UDP %p event %x\n", rx.ev, ev); if(!(ev&EV_READ)) return; - for(unsigned i=0; i<4; i++) - { - osiSocklen_t alen = msg.src.size(); - - const int nrx = recvfrom(sock.sock, (char*)&buf[0], buf.size(), 0, &msg.src->sa, &alen); - log_printf(logio, PLVL_DEBUG, "recvfrom() -> %d\n", nrx); - - if(nrx<0) { - int err = evutil_socket_geterror(sock.sock); - if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { - // nothing to do here - } else { - log_printf(logio, PLVL_WARN, "UDP RX Error on %s : %s\n", name.c_str(), - evutil_socket_error_to_string(err)); - } - return; // wait for more I/O - - } else if(nrx==0) { - // maybe a zero (body) length packet? - // maybe an OS error? - return; - } - - log_hex_printf(logio, PLVL_DEBUG, &buf[0], nrx, "UDP Rx from %s", msg.src.tostring().c_str()); - - msgs.clear(); - - sbuf packet(&buf[0], size_t(nrx)); - - while(!packet.empty() && !packet.err) { - // do validation early, before fanout. - - // minimum header size is 8 bytes - // ID byte must by 0xCA (because PVA has some paternal envy) - // ignore incompatible version 0 - // UDP packets can't contain control messages, or use segmentation - - if(packet.size()<8) { - log_printf(logio, PLVL_INFO, "UDP ignore runt on %s\n", name.c_str()); - return; - - } else if(packet[0]!=0xca || packet[1]==0 || (packet[2]&(pva_flags::Control|pva_flags::SegMask))) { - - log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n", - unsigned(packet.size()), packet[0], packet[1], packet[2], packet[3], - name.c_str()); - return; // better luck next time? - } - - auto save = packet; - - bool be = packet[2]&pva_flags::MSB; - packet += 4; - uint32_t len=0; - from_wire(packet, len, be); - - if(len > packet.size() && !packet.err) { - log_printf(logio, PLVL_INFO, "UDP ignore header%u %02x%02x%02x%02x on %s\n", - unsigned(packet.size()), packet[0], packet[1], packet[2], packet[3], - name.c_str()); - return; - } - msgs.push_back(save); - packet += len; - } - - if(packet.err) { - log_printf(logio, PLVL_WARN, "UDP packet decode fails. Ignoring\n"); - - } else if(!msgs.empty()) { - msgs.emplace_back(nullptr, 0); - msg.msgs = &msgs[0]; - - for(auto L : listeners) - { - if(!L->cb) - continue; - try { - (L->cb)(msg); - }catch(std::exception& e){ - log_printf(logio, PLVL_ERR, "Error in callback: %s\n", e.what()); - } - } - } - } - + // handle up to 4 packets before going back to the reactor + for(unsigned i=0; i<4 && handle_one(); i++) {} } static void handle_static(evutil_socket_t fd, short ev, void *raw) { @@ -150,6 +234,10 @@ struct UDPCollector : public std::enable_shared_from_this log_printf(logio, PLVL_CRIT, "Ignoring unhandled exception in UDPManager::handle(): %s\n", e.what()); } } + + // Search interface +public: + virtual bool reply(const void *msg, size_t msglen) const override; }; @@ -175,9 +263,11 @@ UDPCollector::UDPCollector(const std::shared_ptr& manager, cons ,bind_addr(bind_addr) ,sock(bind_addr.family(), SOCK_DGRAM, 0) ,rx(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this) - ,buf(0x10000) - ,msg(this) + ,buf(0x10001) + ,beaconMsg(src) { + beaconMsg.guid.resize(12); + epicsSocketEnableAddressUseForDatagramFanout(sock.sock); sock.bind(this->bind_addr); name = "UDP "+this->bind_addr.tostring(); @@ -227,8 +317,8 @@ UDPManager UDPManager::instance() return UDPManager(ret); } -std::unique_ptr UDPManager::subscribe(SockAddr& dest, - std::function&& cb) +std::unique_ptr UDPManager::onBeacon(SockAddr& dest, + std::function&& cb) { if(!pvt) throw std::invalid_argument("UDPManager null"); @@ -238,38 +328,53 @@ std::unique_ptr UDPManager::subscribe(SockAddr& dest, pvt->loop.call([this, &ret, &dest, &cb](){ // from event loop worker - ret.reset(new UDPListener); - ret->cb = std::move(cb); - - if(dest.port()!=0) { - auto it = pvt->collectors.find(dest); - if(it!=pvt->collectors.end()) { - try { - ret->collector = it->second->shared_from_this(); - }catch(std::bad_weak_ptr&){ - // nothing to do - } - } - } - - if(!ret->collector) { - ret->collector.reset(new UDPCollector(pvt->shared_from_this(), dest)); - } - - ret->collector->listeners.insert(ret.get()); - - ret->dest = dest; + ret.reset(new UDPListener(pvt.get(), dest)); + ret->beaconCB = std::move(cb); }); return ret; } -UDPListener::~UDPListener() +std::unique_ptr UDPManager::onSearch(SockAddr& dest, + std::function&& cb) { - cancel(); + if(!pvt) + throw std::invalid_argument("UDPManager null"); + + std::unique_ptr ret; + + pvt->loop.call([this, &ret, &dest, &cb](){ + // from event loop worker + + ret.reset(new UDPListener(pvt.get(), dest)); + ret->searchCB = std::move(cb); + }); + + return ret; } -void UDPListener::cancel() +UDPListener::UDPListener(UDPManager::Pvt *manager, const SockAddr &dest) + :dest(dest) +{ + if(dest.port()!=0) { + auto it = manager->collectors.find(dest); + if(it!=manager->collectors.end()) { + try { + collector = it->second->shared_from_this(); + }catch(std::bad_weak_ptr&){ + // nothing to do + } + } + } + + if(!collector) { + collector.reset(new UDPCollector(manager->shared_from_this(), dest)); + } + + collector->listeners.insert(this); +} + +UDPListener::~UDPListener() { if(!collector) return; @@ -285,19 +390,15 @@ void UDPListener::cancel() // UDPManager may be destroyed at this point, which joins its event loop worker } -UDPMsg::UDPMsg(UDPCollector *collector) - :collector(collector) -{} - -bool UDPMsg::reply(const void *msg, size_t msglen) const +bool UDPCollector::reply(const void *msg, size_t msglen) const { - int ntx = sendto(collector->sock.sock, (char*)msg, msglen, 0, &src->sa, src.size()); + int ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size()); if(ntx<0) { - int err = evutil_socket_geterror(collector->sock.sock); + int err = evutil_socket_geterror(sock.sock); if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { // nothing to do here } else { - log_printf(logio, PLVL_WARN, "UDP TX Error on %s : %s\n", collector->name.c_str(), + log_printf(logio, PLVL_WARN, "UDP TX Error on %s : %s\n", name.c_str(), evutil_socket_error_to_string(err)); } return false; // wait for more I/O @@ -305,4 +406,12 @@ bool UDPMsg::reply(const void *msg, size_t msglen) const return size_t(ntx)==msglen; } +UDPManager::Search::~Search() {} + } // namespace pvxsimpl + +namespace std { +void default_delete::operator()(pvxsimpl::UDPListener* listener) { + delete listener; +}; +} // namespace std diff --git a/src/udp_collector.h b/src/udp_collector.h index 8d2cd13..920c793 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -10,44 +10,27 @@ #include #include #include +#include #include #include "evhelper.h" +namespace pvxsimpl { +struct UDPListener; +} // namespace pvxsimpl + +namespace std { +template<> +struct default_delete { + PVXS_API void operator()(pvxsimpl::UDPListener*); +}; +} // namespace std + namespace pvxsimpl { struct UDPCollector; struct UDPManager; -struct UDPMsg { - //! peer (source) address - SockAddr src; - //! points to the first byte of each message in a packet, followed by an empty message - const sbuf* msgs; - - //! attempt to queue a reply message - bool reply(const void *msg, size_t msglen) const; - -private: - UDPCollector *collector; - friend struct UDPCollector; - explicit UDPMsg(UDPCollector *collector); -}; - -//! Represents a subscription to the UDPManager -struct PVXS_API UDPListener { - //! automatically cancel()s - ~UDPListener(); - //! Stop receiving packets. Caller blocks until any in-progress callback has returned - void cancel(); -private: - friend struct UDPCollector; - friend struct UDPManager; - SockAddr dest; - std::shared_ptr collector; - std::function cb; -}; - //! Manage reception, fanout, and reply of UDP PVA on the well known port. struct PVXS_API UDPManager { @@ -55,25 +38,25 @@ struct PVXS_API UDPManager static UDPManager instance(); ~UDPManager(); - /** Create subscription for UDP packets. - * - * The provided callback functor will be invoked from a shared internal worker thread. - * The callback should not block this worker for an extended period of time. - * - * UDPMsg::msgs has already passed basic validation and it may be assumed that - * for each message: - * - * 1. Is at least 8 bytes - * 2. Is an application message w/o segmentation - * 3. Payload size field is consistent with total packet length (if decoded with correct endianness) - * - * The provided functor will be destroyed during UDPListener::cancel() or ~UDPListener - * - * @param dest Address to bind this socket. Updated with actual address (cf. getsockname() ) after bind(). - * @param cb Called for each valid packet - */ - std::unique_ptr subscribe(SockAddr& dest, - std::function&& cb); + struct Beacon { + SockAddr& src; + SockAddr server; + std::vector guid; + Beacon(SockAddr& src) :src(src) {} + }; + std::unique_ptr onBeacon(SockAddr& dest, + std::function&& cb); + + struct PVXS_API Search { + SockAddr src; + SockAddr server; + std::vector names; + + virtual bool reply(const void *msg, size_t msglen) const =0; + virtual ~Search(); + }; + std::unique_ptr onSearch(SockAddr& dest, + std::function&& cb); explicit operator bool() const { return !!pvt; } diff --git a/tools/pvxvct.cpp b/tools/pvxvct.cpp index 0fa0670..d6e76c9 100644 --- a/tools/pvxvct.cpp +++ b/tools/pvxvct.cpp @@ -195,154 +195,31 @@ int main(int argc, char *argv[]) } } - auto cb = [&opts](const pva::UDPMsg& msg) + auto searchCB = [&opts](const pva::UDPManager::Search& msg) { - // later, from worker thread - - // filter by sender - if(!opts.peers.empty()) { - if(msg.src.family()!=AF_INET) - return; - - bool match = false; - for(auto& tup : opts.peers) { - uint32_t addr, mask; - std::tie(addr, mask) = tup; - if((msg.src->in.sin_addr.s_addr&mask)==addr) { - match = true; - break; - } - } - if(!match) - return; - } - - bool showpeer=false; - auto lazypeer = [&showpeer, &msg]() { - if(!showpeer) - log_printf(out, PLVL_INFO, "From %s\n", msg.src.tostring().c_str()); - showpeer = true; - }; - - // allow that one UDP packet may contain several PVA messages - for(unsigned i=0; !msg.msgs[i].empty(); i++) - { - auto M = msg.msgs[i]; - auto be = M[2]&pva::pva_flags::MSB; - auto cmd = M[3]; - M+=4; // skip header - uint32_t blen; - pva::from_wire(M, blen, be); - - switch(cmd) { - case pva::pva_app_msg::OriginTag: - log_printf(out, PLVL_WARN, "Peer sends ORIGIN_TAG by unicast/broadcast.\n"); - break; - - case pva::pva_app_msg::Search: { - uint32_t id; - uint8_t flags; - pva::SockAddr replyAddr; - - pva::from_wire(M, id, be); - pva::from_wire(M, flags, be); - M += 3; // unused/reserved - - pva::from_wire(M, replyAddr, be); - uint16_t port = 0; - pva::from_wire(M, port, be); - replyAddr.setPort(port); - - // so far, only "tcp" transport has ever been seen. - // however, we will consider and ignore any others which might appear - bool foundtcp = false; - size_t nproto=0; - pva::from_wire(M, pva::Size(nproto), be); - for(size_t i=0; i(nchar), be); - - if(M.size()>=3 && nchar==3 && M[0]=='t' && M[1]=='c' && M[2]=='p') { - foundtcp = true; - M += 3; - break; - } - } - if(!foundtcp && !M.err) { - // so far, not something which should actually happen - log_printf(out, PLVL_DEBUG, " Search w/o proto \"tcp\"\n"); - continue; - } - - // one Search message can include many PV names. - uint16_t nchan=0; - pva::from_wire(M, nchan, be); - - for(size_t i=0; i(chlen), be); - if(opts.client && chlen<=M.size() && !M.err) { - std::string pvname(reinterpret_cast(M.pos), chlen); - if(opts.pvnames.empty() || opts.pvnames.find(pvname)!=opts.pvnames.end()) { - lazypeer(); - log_printf(out, PLVL_INFO, " Search 0x%08x '%s' (rsvp %s)\n", - unsigned(id), pvname.c_str(), replyAddr.tostring().c_str()); - } - } - M += chlen; - } - - break; - } - - case pva::pva_app_msg::Beacon: { - uint8_t guid[12] = {}; - uint8_t seq =0; - pva::SockAddr addr; - uint16_t port = 0; - - pva::_from_wire(M, guid, false); - M += 1; // flags/qos. unused - pva::from_wire(M, seq, be); - M += 2; // "change" count. unused - pva::from_wire(M, addr, be); - pva::from_wire(M, port, be); - addr.setPort(port); - - size_t protolen=0; - pva::from_wire(M, pva::Size(protolen), be); - M += protolen; // ignore string - - // ignore remaining "server status" blob - - if(opts.server && !M.err) { - lazypeer(); - log_printf(out, PLVL_INFO, " Beacon %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s seq %u\n", - guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11], - addr.tostring().c_str(), seq); - } - } - break; - - default: - log_printf(out, PLVL_WARN, "unknown command 0x%02x\n", cmd); - } - - if(M.err) { - log_printf(out, PLVL_ERR, " Error while decoding\n"); - } + log_printf(out, PLVL_INFO, "%s Searching for:\n", msg.src.tostring().c_str()); + for(const auto pv : msg.names) { + log_printf(out, PLVL_INFO, " \"%s\"\n", pv); } }; - std::vector> listeners; + auto beaconCB = [&opts](const pva::UDPManager::Beacon& msg) + { + const auto& guid = msg.guid; + log_printf(out, PLVL_INFO, "%s Beacon %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n", + msg.src.tostring().c_str(), + guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11], + msg.server.tostring().c_str()); + + }; + + std::vector, std::unique_ptr>> listeners; listeners.reserve(bindaddrs.size()); for(auto& baddr : bindaddrs) { - listeners.push_back(pva::UDPManager::instance() - .subscribe(baddr, cb)); + auto manager = pva::UDPManager::instance(); + listeners.emplace_back(manager.onSearch(baddr, searchCB), + manager.onBeacon(baddr, beaconCB)); log_printf(out, PLVL_DEBUG, "Bind: %s\n", baddr.tostring().c_str()); }