From bd9c3cc3387209ba211ef935ad607e69dea48466 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 30 Aug 2021 09:49:49 -0700 Subject: [PATCH] UDPListener add mcast --- src/udp_collector.cpp | 657 ++++++++++++++++++++++-------------------- src/udp_collector.h | 8 +- 2 files changed, 356 insertions(+), 309 deletions(-) diff --git a/src/udp_collector.cpp b/src/udp_collector.cpp index c0e65c6..371724c 100644 --- a/src/udp_collector.cpp +++ b/src/udp_collector.cpp @@ -35,8 +35,10 @@ struct UDPCollector : public UDPManager::Search, public std::enable_shared_from_this { UDPManager::Pvt* const manager; - SockAddr bind_addr; - SockAddr mcast_addr; + SockAddr bind_addr; // address our socket is bound to + SockAddr lo_mcast_addr; // destination endpoint for local mcast forwarding + SockAddr lo_addr; + std::set> mcast_grps; // mcast group+iface pairs which our socket has joined std::string name; evsocket sock; evevent rx; @@ -48,238 +50,33 @@ struct UDPCollector : public UDPManager::Search, std::set listeners; - // loopback collector for our port. - std::shared_ptr loSibling; - // for the loSibling, list of non-lo siblings for forwards - std::set otherSiblings; - - UDPCollector(UDPManager::Pvt* manager, const SockAddr& bind_addr); + UDPCollector(UDPManager::Pvt* manager, uint16_t port); ~UDPCollector(); - bool handle_one() - { - SockAddr dest; + void addListener(UDPListener *l); + void delListener(UDPListener *l); - buf.resize(0x10001); + bool handle_one(); - // For Search messages, we use PV name strings in-place by adding nils. - // Ensure one extra byte at the end of the buffer for a nil after the last PV name - recvfromx rx{sock.sock, (char*)&buf[0], buf.size()-1, &src, &dest}; - const int nrx = rx.call(); + enum origin_t { + Remote, // received from interface other than loopback + Loopback, // received through loopback + OriginTag, // payload of CMD_ORIGIN_TAG + }; - if(nrx>=0 && rx.ndrop!=0u && prevndrop!=rx.ndrop) { - log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop)); - prevndrop = rx.ndrop; - } - - if(nrx<0) { - int err = evutil_socket_geterror(sock.sock); - if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { - // nothing to do here - } else { - log_warn_printf(logio, "UDP RX Error on %s : %s\n", name.c_str(), - evutil_socket_error_to_string(err)); - } - return false; // wait for more I/O - - } - - log_hex_printf(logio, Level::Debug, &buf[0], nrx, "UDP %d Rx %d, %s -> %s (%s)\n", - sock.sock, nrx, src.tostring().c_str(), dest.tostring().c_str(), bind_addr.tostring().c_str()); - - process_one(dest, &buf[0], nrx, false); - return true; - } - - void process_one(const SockAddr& dest, const uint8_t* buf, int nrx, bool originated) - { - FixedBuf M(true, const_cast(buf), nrx); - Header head{}; - from_wire(M, head); // overwrites M.be - - if(!M.good() || (head.flags&(pva_flags::Control|pva_flags::SegMask))) { - // UDP packets can't contain control messages, or use segmentation - - log_hex_printf(logio, Level::Debug, &buf[0], nrx, "Ignore UDP message from %s\n", src.tostring().c_str()); - return; - } - - names.clear(); - - if(head.len > M.size() && M.good()) { - log_info_printf(logio, "UDP ignore header%u %02x%02x%02x%02x on %s\n", - unsigned(M.size()), M[0], M[1], M[2], M[3], - name.c_str()); - return; - } - - switch(head.cmd) { - - case CMD_SEARCH: { - uint8_t flags = 0; - SockAddr replyAddr; - uint16_t port = 0; - - from_wire(M, searchID); - auto save_flags = M.save(); - from_wire(M, flags); - mustReply = flags&pva_search_flags::MustReply; - - M.skip(3, __FILE__, __LINE__); // unused/reserved - - auto save_replyAddr = M.save(); - from_wire(M, replyAddr); - from_wire(M, port); - if(replyAddr.isAny()) { - replyAddr = src; - if(originated) { - log_err_printf(logio, "CMD_ORIGIN_TAG search with reply to sender never works%s", "\n"); - return; - } - } - replyAddr.setPort(port); - - if(M.good() && !originated && (flags&pva_search_flags::Unicast) && loSibling && dest.family()!=AF_UNSPEC) { - // clear unicast flag in forwarded message - *save_flags &= ~pva_search_flags::Unicast; - // recipient of forwarded message must use, and trust, replyAddr in body :( - { - FixedBuf R(M.be, save_replyAddr, 16u); - to_wire(R, replyAddr); - assert(R.good()); - } - loSibling->forwardM(dest, &buf[0], nrx); - return; - } - - // so far, only "tcp" transport has ever been seen. - // however, we will consider and ignore any others which might appear - bool foundtcp = false; - Size nproto{0}; - from_wire(M, nproto); - for(size_t i=0; i=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p'; - M.skip(nchar.size, __FILE__, __LINE__); - } - - // one Search message can include many PV names. - uint16_t nchan=0; - from_wire(M, nchan); - - names.clear(); - names.reserve(nchan); - - for(size_t i=0; i(M.save()), id}); - } - M.skip(chlen.size, __FILE__, __LINE__); - } - - // used by our reply() - src = replyAddr; - - if(M.good()) { - // ensure nil for final PV name - *M.save() = '\0'; - - for(auto L : listeners) { - if(L->searchCB) { - (L->searchCB)(*this); - } - } - } - - break; - } - - case CMD_BEACON: { - uint16_t port = 0; - - _from_wire<12>(M, &beaconMsg.guid[0], false, __FILE__, __LINE__); - M.skip(4, __FILE__, __LINE__); // skip flags, seq, and change count. unused - from_wire(M, beaconMsg.server); - from_wire(M, port); - if(beaconMsg.server.isAny()) { - beaconMsg.server = src; - } - beaconMsg.server.setPort(port); - - std::string proto; - from_wire(M, proto); - - // ignore remaining "server status" blob - - if(M.good() && proto=="tcp") { - for(auto L : listeners) { - if(L->beaconCB) { - (L->beaconCB)(beaconMsg); - } - } - } - break; - } - - case CMD_ORIGIN_TAG: { - SockAddr origin; // aka. original destination - from_wire(M, origin); - M.skip(head.len-16u, __FILE__, __LINE__); - - // only allow one CMD_ORIGIN_TAG message per packet - // only accept when sent to the mcast address from the loopback address - // since we only join the mcast group on loopback this will hopefully - // frustrate attempts to inject CMD_ORIGIN_TAG externally. - if(M.good() && !originated && dest.compare(mcast_addr,false)==0 && src.isLO()) { - origin.setPort(bind_addr.port()); - - originated = true; - log_debug_printf(logio, "Accept as originated from %s\n", origin.tostring().c_str()); - for(auto sib : otherSiblings) { - sib->process_one(origin, M.save(), M.size(), true); - } - return; - } - log_debug_printf(logio, "Ignore originated from %s %c%c%c%c\n", - origin.tostring().c_str(), - M.good() ? 'T' : 'F', - !originated ? 'T' : 'F', - dest.compare(mcast_addr,false)==0 ? 'T' : 'F', - src.isLO() ? 'T' : 'F'); - - break; - } - - default: - break; // ignore unknown - } - } - void handle(short ev) - { - log_debug_printf(logio, "UDP %p event %x\n", rx.get(), ev); - if(!(ev&EV_READ)) - return; - - // handle up to 4 packets before going back to the reactor - for(unsigned i=0; i<4 && handle_one(); i++) {} - } + void process_one(const SockAddr& dest, const uint8_t* buf, size_t nrx, origin_t origin); static void handle_static(evutil_socket_t fd, short ev, void *raw) { (void)fd; + auto self = static_cast(raw); try { - static_cast(raw)->handle(ev); + log_debug_printf(logio, "UDP %p event %x\n", self->rx.get(), ev); + if(!(ev&EV_READ)) + return; + + // handle up to 4 packets before going back to the reactor + for(unsigned i=0; i<4 && self->handle_one(); i++) {} + }catch(std::exception& e) { log_crit_printf(logio, "Ignoring unhandled exception in UDPManager::handle(): %s\n", e.what()); } @@ -296,9 +93,10 @@ public: struct UDPManager::Pvt { evbase loop; + IfaceMap ifmap; // only manipulate from loop worker thread - std::map collectors; + std::map collectors; Pvt() :loop("PVXUDP", epicsThreadPriorityCAServerLow-4) @@ -314,7 +112,7 @@ struct UDPManager::Pvt { std::shared_ptr collector; if(dest.port()!=0) { - auto it = collectors.find(dest); + auto it = collectors.find(dest.port()); if(it!=collectors.end()) { try { collector = it->second->shared_from_this(); @@ -325,17 +123,18 @@ struct UDPManager::Pvt { } if(!collector) { - collector.reset(new UDPCollector(this, dest)); + collector.reset(new UDPCollector(this, dest.port())); } return collector; } }; -UDPCollector::UDPCollector(UDPManager::Pvt *manager, const SockAddr& requested_bind_addr) +UDPCollector::UDPCollector(UDPManager::Pvt *manager, uint16_t requested_port) :manager(manager) - ,bind_addr(requested_bind_addr) - ,mcast_addr(AF_INET, "224.0.0.128") - ,sock(requested_bind_addr.family(), SOCK_DGRAM, 0) + ,bind_addr(SockAddr::any(AF_INET, requested_port)) + ,lo_mcast_addr(bind_addr.family(), "224.0.0.128") + ,lo_addr(SockAddr::loopback(bind_addr.family())) + ,sock(bind_addr.family(), SOCK_DGRAM, 0) ,rx(event_new(manager->loop.base, sock.sock, EV_READ|EV_PERSIST, &handle_static, this)) ,beaconMsg(src) { @@ -344,14 +143,8 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, const SockAddr& requested_b epicsSocketEnableAddressUseForDatagramFanout(sock.sock); enable_SO_RXQ_OVFL(sock.sock); enable_IP_PKTINFO(sock.sock); - sock.bind(bind_addr); - name = "UDP "+bind_addr.tostring(); - auto lo_addr(SockAddr::loopback(bind_addr.family(), bind_addr.port())); - assert(lo_addr.port()!=0u); - mcast_addr.setPort(bind_addr.port()); - - /* Bind this address to receive multicast over loopback, and also to send them. + /* Always bind to wildcard to receive all uni/broad/multicast, and also to send them. * Notes: * - Linux, it would be possible to bind to the mcast address in order to receive only * packets so destined. It would also be possible to send mcasts from a socket @@ -361,50 +154,324 @@ UDPCollector::UDPCollector(UDPManager::Pvt *manager, const SockAddr& requested_b * * So we take the least common denominator across all platforms, which is to bind to the wildcard. * This socket may then receive unicasts which need to be forwarded. - * Also broadcasts, which will be ignored (if no listeners). */ - auto mcast_bind_addr(SockAddr::any(AF_INET, bind_addr.port())); + sock.bind(bind_addr); + name = "UDP "+bind_addr.tostring(); - if(bind_addr!=mcast_bind_addr) { - loSibling = manager->collect(mcast_bind_addr); - assert(this!=loSibling.get()); + lo_mcast_addr.setPort(bind_addr.port()); + lo_addr.setPort(bind_addr.port()); - log_info_printf(logsetup, "Bound %d to %s with sibling %s\n", sock.sock, name.c_str(), loSibling->name.c_str()); + // join local group to receive + sock.mcast_join(lo_mcast_addr, lo_addr); + // setup for re-transmit + sock.mcast_ttl(1); // make default explicit, we will only send to lo_mcast_addr. + sock.mcast_loop(true); + sock.mcast_iface(lo_addr); - } else { - // join group to receive - sock.mcast_join(mcast_addr, lo_addr); - // setup for re-transmit - sock.mcast_ttl(1); // make default explicit - sock.mcast_loop(true); - sock.mcast_iface(lo_addr); + mcast_grps.emplace(lo_mcast_addr, lo_addr); - log_info_printf(logsetup, "Bound %d to %s as lo\n", sock.sock, name.c_str()); - } + log_info_printf(logsetup, "Bound %d to %s as lo\n", sock.sock, name.c_str()); if(event_add(rx.get(), nullptr)) throw std::runtime_error("Unable to create collector Rx event"); - if(loSibling) - loSibling->otherSiblings.insert(this); - - manager->collectors[bind_addr] = this; + manager->collectors[bind_addr.port()] = this; } UDPCollector::~UDPCollector() { manager->loop.assertInLoop(); - if(loSibling) - loSibling->otherSiblings.erase(this); - - manager->collectors.erase(bind_addr); + manager->collectors.erase(bind_addr.port()); // we should only be destroyed after that last listener has removed itself assert(listeners.empty()); manager->loop.assertInLoop(); } +void UDPCollector::addListener(UDPListener *l) +{ + for(const auto& mcast : l->mcasts) { + const auto tup(std::make_pair(mcast, l->dest)); + if(mcast_grps.find(tup)==mcast_grps.end()) { + mcast_grps.insert(tup); + + log_debug_printf(logsetup, "collector joining %s on %s\n", + mcast.tostring().c_str(), + l->dest.tostring().c_str()); + + sock.mcast_join(mcast, l->dest); + } + } + listeners.insert(l); +} + +void UDPCollector::delListener(UDPListener *l) +{ + listeners.erase(l); + + // TODO: bother to cleanup mcast group membership? +} + +// size of a CMD_ORIGIN_TAG prefix header +static constexpr size_t cmd_origin_tag_size = 8 + 16; + +bool UDPCollector::handle_one() +{ + SockAddr dest; + + buf.resize(cmd_origin_tag_size + 0x10000 + 1); + auto rxbuf = &buf[cmd_origin_tag_size]; + auto rxlen = buf.size()-cmd_origin_tag_size-1; + + // For Search messages, we use PV name strings in-place by adding nils. + // Ensure one extra byte at the end of the buffer for a nil after the last PV name + recvfromx rx{sock.sock, (char*)rxbuf, rxlen, &src, &dest}; + const int nrx = rx.call(); + + if(nrx>=0 && rx.ndrop!=0u && prevndrop!=rx.ndrop) { + log_debug_printf(logio, "UDP collector socket buffer overflowed %u -> %u\n", unsigned(prevndrop), unsigned(rx.ndrop)); + prevndrop = rx.ndrop; + } + + if(nrx<0) { + int err = evutil_socket_geterror(sock.sock); + if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { + // nothing to do here + } else { + log_warn_printf(logio, "UDP RX Error on %s : %s\n", name.c_str(), + evutil_socket_error_to_string(err)); + } + return false; // wait for more I/O + + } + + log_hex_printf(logio, Level::Debug, rxbuf, nrx, "UDP %d Rx %d, %s -> %s @%u (%s)\n", + sock.sock, nrx, src.tostring().c_str(), dest.tostring().c_str(), unsigned(rx.dstif), bind_addr.tostring().c_str()); + + origin_t origin = manager->ifmap.has_address(rx.dstif, lo_addr) ? Loopback : Remote; + + process_one(dest, rxbuf, nrx, origin); + return true; +} + +void UDPCollector::process_one(const SockAddr &dest, const uint8_t *buf, size_t nrx, origin_t origin) +{ + FixedBuf M(true, const_cast(buf), nrx); + Header head{}; + from_wire(M, head); // overwrites M.be + + if(!M.good() || (head.flags&(pva_flags::Control|pva_flags::SegMask))) { + // UDP packets can't contain control messages, or use segmentation + + log_hex_printf(logio, Level::Debug, &buf[0], nrx, "Ignore UDP message from %s\n", src.tostring().c_str()); + return; + } + + names.clear(); + + if(head.len > M.size() && M.good()) { + log_info_printf(logio, "UDP ignore header%u %02x%02x%02x%02x on %s\n", + unsigned(M.size()), M[0], M[1], M[2], M[3], + name.c_str()); + return; + } + + switch(head.cmd) { + + case CMD_SEARCH: { + uint8_t flags = 0; + SockAddr replyAddr; + uint16_t port = 0; + + from_wire(M, searchID); + auto save_flags = M.save(); + from_wire(M, flags); + mustReply = flags&pva_search_flags::MustReply; + + M.skip(3, __FILE__, __LINE__); // unused/reserved + + auto save_replyAddr = M.save(); + from_wire(M, replyAddr); + from_wire(M, port); + if(replyAddr.isAny()) { + replyAddr = src; + if(origin==OriginTag) { + log_err_printf(logio, "CMD_ORIGIN_TAG search with reply to sender never works%s", "\n"); + return; + } + } + replyAddr.setPort(port); + + if(M.good() && origin==Loopback && (flags&pva_search_flags::Unicast) && dest.family()!=AF_UNSPEC) { + assert(buf==&this->buf[cmd_origin_tag_size]); + // clear unicast flag in forwarded message + *save_flags &= ~pva_search_flags::Unicast; + // recipient of forwarded message must use, and trust, replyAddr in body :( + { + FixedBuf R(M.be, save_replyAddr, 16u); + to_wire(R, replyAddr); + assert(R.good()); + } + forwardM(dest, buf, nrx); + return; + } + + // so far, only "tcp" transport has ever been seen. + // however, we will consider and ignore any others which might appear + bool foundtcp = false; + Size nproto{0}; + from_wire(M, nproto); + for(size_t i=0; i=3 && nchar.size==3 && M[0]=='t' && M[1]=='c' && M[2]=='p'; + M.skip(nchar.size, __FILE__, __LINE__); + } + + // one Search message can include many PV names. + uint16_t nchan=0; + from_wire(M, nchan); + + names.clear(); + names.reserve(nchan); + + for(size_t i=0; i(M.save()), id}); + } + M.skip(chlen.size, __FILE__, __LINE__); + } + + // used by our reply() + src = replyAddr; + + if(M.good()) { + // ensure nil for final PV name + *M.save() = '\0'; + + for(auto L : listeners) { + if(L->searchCB) { + (L->searchCB)(*this); + } + } + } + + break; + } + + case CMD_BEACON: { + uint16_t port = 0; + + _from_wire<12>(M, &beaconMsg.guid[0], false, __FILE__, __LINE__); + M.skip(4, __FILE__, __LINE__); // skip flags, seq, and change count. unused + from_wire(M, beaconMsg.server); + from_wire(M, port); + if(beaconMsg.server.isAny()) { + beaconMsg.server = src; + } + beaconMsg.server.setPort(port); + + std::string proto; + from_wire(M, proto); + + // ignore remaining "server status" blob + + if(M.good() && proto=="tcp") { + for(auto L : listeners) { + if(L->beaconCB) { + (L->beaconCB)(beaconMsg); + } + } + } + break; + } + + case CMD_ORIGIN_TAG: { + SockAddr originaddr; // aka. original destination + from_wire(M, originaddr); + M.skip(head.len-16u, __FILE__, __LINE__); + + // only allow one CMD_ORIGIN_TAG message per packet + // only accept when sent to the mcast address from the loopback address + // since we only join the mcast group on loopback this will hopefully + // frustrate attempts to inject CMD_ORIGIN_TAG externally. + if(M.good() && origin==Loopback && dest.compare(lo_mcast_addr,false)==0 && src.isLO()) { + originaddr.setPort(bind_addr.port()); + + process_one(originaddr, M.save(), M.size(), OriginTag); + + return; + } + log_debug_printf(logio, "Ignore originated from %s %c%c%c%c\n", + originaddr.tostring().c_str(), + M.good() ? 'T' : 'F', + origin==Loopback ? 'T' : 'F', + dest.compare(lo_mcast_addr,false)==0 ? 'T' : 'F', + src.isLO() ? 'T' : 'F'); + + break; + } + + default: + break; // ignore unknown + } +} + +void UDPCollector::forwardM(const SockAddr& origin, const uint8_t *pbuf, size_t plen) +{ + log_debug_printf(logio, "Forward as originated for %s\n", + origin.tostring().c_str()); + + assert(buf.size() > cmd_origin_tag_size); + assert(pbuf==&buf[cmd_origin_tag_size]); + + { + FixedBuf M(true, &buf[0], cmd_origin_tag_size); + + to_wire(M, Header{CMD_ORIGIN_TAG, 0, 16u}); + to_wire(M, origin); + assert(M.good()); + assert(M.save()==&buf[cmd_origin_tag_size]); + } + + src = lo_mcast_addr; + reply(&buf[0], cmd_origin_tag_size+plen); +} + +bool UDPCollector::reply(const void *msg, size_t msglen) const +{ + manager->loop.assertInLoop(); + + log_hex_printf(logio, Level::Debug, msg, msglen, "Send %s -> %s\n", + bind_addr.tostring().c_str(), src.tostring().c_str()); + + int ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size()); + if(ntx<0) { + int err = evutil_socket_geterror(sock.sock); + if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { + // nothing to do here + } else { + log_warn_printf(logio, "UDP TX Error on %s -> %s : (%d) %s\n", + name.c_str(), src.tostring().c_str(), + err, evutil_socket_error_to_string(err)); + } + return false; // wait for more I/O + } + return size_t(ntx)==msglen; +} + static struct udp_gbl_t { epicsMutex lock; std::weak_ptr inst; @@ -425,7 +492,11 @@ epicsThreadOnceId collector_once = EPICS_THREAD_ONCE_INIT; void collector_init(void *unused) { (void)unused; - udp_gbl = new udp_gbl_t; + try { + udp_gbl = new udp_gbl_t; + }catch(std::exception& e){ + log_exc_printf(logsetup, "Unable to alloc udp_gbl: %s\n", e.what()); + } } } // namespace @@ -504,7 +575,7 @@ UDPListener::UDPListener(const std::shared_ptr &manager, SockAd manager->loop.assertInLoop(); collector = manager->collect(dest); - dest = collector->bind_addr; + dest.setPort(collector->bind_addr.port()); } UDPListener::~UDPListener() @@ -513,67 +584,37 @@ UDPListener::~UDPListener() // from event loop worker if(active) - collector->listeners.erase(this); + collector->delListener(this); collector.reset(); // destroy UDPCollector from worker }); } +void UDPListener::addMCast(const SockAddr& mcast) +{ + manager->loop.call([this, &mcast](){ + if(active) + throw std::logic_error("must addMCast() before start()"); + + collector->mcast_grps.emplace(mcast.withPort(collector->bind_addr.port()), + dest); + }); +} + void UDPListener::start(bool s) { manager->loop.call([this, s](){ if(s && !active) { - collector->listeners.insert(this); + collector->addListener(this); } else if(!s && active) { - collector->listeners.erase(this); + collector->delListener(this); } active = s; }); } -void UDPCollector::forwardM(const SockAddr& origin, const uint8_t* obuf, size_t olen) -{ - log_debug_printf(logio, "Forward as originated for %s\n", - origin.tostring().c_str()); - - buf.resize(8+16+olen); - { - FixedBuf M(true, &buf[0], buf.size()); - - to_wire(M, Header{CMD_ORIGIN_TAG, 0, 16u}); - to_wire(M, origin); - assert(M.good()); - memcpy(M.save(), obuf, olen); - } - - src = mcast_addr; - reply(&buf[0], buf.size()); -} - -bool UDPCollector::reply(const void *msg, size_t msglen) const -{ - manager->loop.assertInLoop(); - - log_hex_printf(logio, Level::Debug, msg, msglen, "Send %s -> %s\n", - bind_addr.tostring().c_str(), src.tostring().c_str()); - - int ntx = sendto(sock.sock, (char*)msg, msglen, 0, &src->sa, src.size()); - if(ntx<0) { - int err = evutil_socket_geterror(sock.sock); - if(err==SOCK_EWOULDBLOCK || err==EAGAIN || err==SOCK_EINTR) { - // nothing to do here - } else { - log_warn_printf(logio, "UDP TX Error on %s -> %s : (%d) %s\n", - name.c_str(), src.tostring().c_str(), - err, evutil_socket_error_to_string(err)); - } - return false; // wait for more I/O - } - return size_t(ntx)==msglen; -} - UDPManager::Search::~Search() {} }} // namespace pvxs::impl diff --git a/src/udp_collector.h b/src/udp_collector.h index 8736ddb..1802967 100644 --- a/src/udp_collector.h +++ b/src/udp_collector.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -83,11 +84,14 @@ private: class PVXS_API UDPListener { + friend struct UDPManager; + std::function searchCB; std::function beaconCB; const std::shared_ptr manager; std::shared_ptr collector; const SockAddr dest; + std::set mcasts; bool active; INST_COUNTER(UDPListener); @@ -95,10 +99,12 @@ class PVXS_API UDPListener friend struct UDPCollector; friend struct UDPManager; -public: UDPListener(const std::shared_ptr& manager, SockAddr& dest); +public: ~UDPListener(); + void addMCast(const SockAddr& mcast); + void start(bool s=true); inline void stop() { start(false); } };