From acfba6469ed3d9013a48602667da7ccc84a53625 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Sat, 22 Feb 2020 09:28:47 -0800 Subject: [PATCH] start client beacon rx --- src/client.cpp | 79 ++++++++++++++++++++++++++++++++++++++++++++--- src/clientimpl.h | 11 +++++++ src/config.cpp | 9 ++++-- src/pvxs/client.h | 8 ++++- src/server.cpp | 4 +++ 5 files changed, 103 insertions(+), 8 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 48d5b7e..861d3eb 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -26,6 +26,8 @@ constexpr size_t nBuckets = 30u; constexpr size_t maxSearchPayload = 0x4000; +constexpr timeval beaconCleanInterval{2*180, 0}; + Disconnect::Disconnect() :std::runtime_error("Disconnected") {} @@ -180,14 +182,12 @@ Context::Pvt::Pvt(const Config& conf) ,tcp_loop("PVXCTCP", epicsThreadPriorityCAServerLow) ,searchRx(event_new(tcp_loop.base, searchTx.sock, EV_READ|EV_PERSIST, &Pvt::onSearchS, this)) ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this)) + ,beaconCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickBeaconCleanS, this)) { effective.expand(); searchBuckets.resize(nBuckets); - if(effective.udp_port==0) - throw std::runtime_error("Client can't use UDP random port"); - std::set bcasts; { ELLLIST list = ELLLIST_INIT; @@ -242,14 +242,26 @@ Context::Pvt::Pvt(const Config& conf) searchDest.emplace_back(saddr, isucast); } - // TODO: receive beacons - //auto manager = UDPManager::instance(); + auto manager = UDPManager::instance(); + for(auto& iface : effective.interfaces) { + SockAddr addr(AF_INET, iface.c_str(), effective.udp_port); + log_info_printf(io, "Listening for beacons on %s\n", addr.tostring().c_str()); + beaconRx.push_back(manager.onBeacon(addr, [this](const UDPManager::Beacon& msg) { + onBeacon(msg); + })); + } + + for(auto& listener : beaconRx) { + listener->start(); + } if(event_add(searchTimer.get(), &bucketInterval)) log_err_printf(setup, "Error enabling search timer\n%s", ""); if(event_add(searchRx.get(), nullptr)) log_err_printf(setup, "Error enabling search RX\n%s", ""); + if(event_add(searchTimer.get(), &beaconCleanInterval)) + log_err_printf(setup, "Error enabling beacon clean timer on\n%s", ""); } Context::Pvt::~Pvt() {} @@ -291,6 +303,29 @@ void Context::Pvt::poke() throw std::runtime_error("Unable to schedule searchTimer"); } +void Context::Pvt::onBeacon(const UDPManager::Beacon& msg) +{ + const auto& guid = msg.guid; + + epicsTimeStamp now; + epicsTimeGetCurrent(&now); + + auto it = beaconSenders.find(msg.src); + if(it!=beaconSenders.end() && msg.guid==it->second.guid) { + it->second.lastRx = now; + return; + } + + beaconSenders.emplace(msg.src, BTrack{msg.guid, now}); + + log_debug_printf(io, "%s New server %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n", + msg.src.tostring().c_str(), + guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11], + msg.server.tostring().c_str()); + + poke(); +} + bool Context::Pvt::onSearch() { searchMsg.resize(0x10000); @@ -560,6 +595,40 @@ void Context::Pvt::tickSearchS(evutil_socket_t fd, short evt, void *raw) } } +void Context::Pvt::tickBeaconClean() +{ + epicsTimeStamp now; + epicsTimeGetCurrent(&now); + + auto it = beaconSenders.begin(); + while(it!=beaconSenders.end()) { + auto cur = it++; + + double age = epicsTimeDiffInSeconds(&now, &cur->second.lastRx); + + if(age < -15.0 || age > 2.1*180.0) { + auto& guid = cur->second.guid; + log_debug_printf(io, "Lost server %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x %s\n", + guid[0], guid[1], guid[2], guid[3], guid[4], guid[5], guid[6], guid[7], guid[8], guid[9], guid[10], guid[11], + cur->first.tostring().c_str()); + + beaconSenders.erase(cur); + } + } + + if(event_add(searchTimer.get(), &beaconCleanInterval)) + log_err_printf(setup, "Error re-enabling beacon clean timer on\n%s", ""); +} + +void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw) +{ + try { + static_cast(raw)->tickBeaconClean(); + }catch(std::exception& e){ + log_crit_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what()); + } +} + } // namespace client } // namespace pvxs diff --git a/src/clientimpl.h b/src/clientimpl.h index d37828d..444448e 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -148,6 +148,12 @@ struct Context::Pvt std::list > beaconRx; + struct BTrack { + std::array guid; + epicsTimeStamp lastRx; + }; + std::map beaconSenders; + std::map> chanByCID; std::map> chanByName; @@ -156,6 +162,7 @@ struct Context::Pvt evbase tcp_loop; const evevent searchRx; const evevent searchTimer; + const evevent beaconCleaner; Pvt(const Config& conf); ~Pvt(); @@ -164,10 +171,14 @@ struct Context::Pvt void poke(); + void onBeacon(const UDPManager::Beacon& msg); + bool onSearch(); static void onSearchS(evutil_socket_t fd, short evt, void *raw); void tickSearch(); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); + void tickBeaconClean(); + static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); }; } // namespace client diff --git a/src/config.cpp b/src/config.cpp index 019a19d..bd4a669 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -269,9 +269,14 @@ Config Config::from_env() void Config::expand() { + if(udp_port==0) + throw std::runtime_error("Client can't use UDP random port"); + + if(interfaces.empty()) + interfaces.emplace_back("0.0.0.0"); + if(autoAddrList) { - std::vector all({"0.0.0.0"}); - expandAddrList(all, addressList); + expandAddrList(interfaces, addressList); autoAddrList = false; } diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 2c03909..16e64bc 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -226,11 +226,17 @@ private: }; struct PVXS_API Config { + //! List of unicast and broadcast addresses std::vector addressList; + //! List of interface addresses on which beacons may be received. + //! Also constrains autoAddrList to only consider broadcast addresses of listed interfaces. + //! Empty implies wildcard 0.0.0.0 + std::vector interfaces; + //! UDP port to bind. Default is 5076. May be zero, cf. Server::config() to find allocated port. unsigned short udp_port = 5076; - //! Whether to populate the beacon address list automatically. (recommended) + //! Whether to extend the addressList with local interface broadcast addresses. (recommended) bool autoAddrList = true; //! Default configuration using process environment diff --git a/src/server.cpp b/src/server.cpp index 19fef6e..990f473 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -147,6 +147,7 @@ client::Config Server::clientConfig() const client::Config ret; ret.udp_port = pvt->effective.udp_port; + ret.interfaces = pvt->effective.interfaces; ret.addressList = pvt->effective.interfaces; ret.autoAddrList = false; @@ -515,6 +516,9 @@ void Server::Pvt::doBeacons(short evt) } else if(unsigned(ntx)