From 7ae659678fa3bf401af0dea26fb8e1c7e4c77eed Mon Sep 17 00:00:00 2001 From: Thomas Ives Date: Mon, 3 Apr 2023 09:48:49 +0100 Subject: [PATCH] fix: do not re-search for other channels when doing initial channel search When creating a large number of Channels at once, we can end up calling `ContextImpl::poke(true)` many times in quick succession. This results in a flood of UDP broadcasts where we are searching for channels that we only just sent out the initial search request for. This can easily lead to packets getting lost and us not receiving a reply for some Channels. Moreover, as we keep resending search requests for Channels, we reschedule them further and further in the future (as `nSearch` is increased). After the dust settles and we stop poking, this can result in a wait of several seconds before a Channel which we have not found is searched for again. In this commit we avoid this issue by using a separate bucket to hold channels waiting for their initial search request. Rather than poking `tickSearch` to do the initial search and also resend requests for outstanding channels, we schedule a call to new call `tickSearch` which will only send the initial search requests. As such, we will avoid rebroadcasting search requests for channels we have only just searched for. We have prompted the `discover` bool to an enum to distinguish between the now three different situations `tickSearch` can be called in. --- src/client.cpp | 55 ++++++++++++++++++++++++++++++------------ src/clientdiscover.cpp | 2 +- src/clientimpl.h | 12 ++++++++- 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index f89eb7c..58fe56a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -340,9 +340,9 @@ std::shared_ptr Channel::build(const std::shared_ptr& cont context->chanByName[namekey] = chan; if(server.empty()) { - context->searchBuckets[context->currentBucket].push_back(chan); + context->initialSearchBucket.push_back(chan); - context->poke(true); + context->scheduleInitialSearch(); } else { // bypass search and connect so a specific server chan->forcedServer = forceServer; @@ -694,6 +694,19 @@ void ContextImpl::poke(bool force) throw std::runtime_error("Unable to schedule searchTimer"); } +void ContextImpl::scheduleInitialSearch() +{ + if (!initialSearchScheduled) + { + log_debug_printf(setup, "scheduleInitialSearch()%s\n", ""); + + initialSearchScheduled = true; + tcp_loop.dispatch([this]() { + tickSearch(SearchKind::initial); + }); + } +} + void ContextImpl::onBeacon(const UDPManager::Beacon& msg) { epicsTimeStamp now; @@ -958,27 +971,38 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw) } } -void ContextImpl::tickSearch(bool discover) +void ContextImpl::tickSearch(SearchKind kind) { - // If !discover, then this is a discovery ping. + // If kind == SearchKind::discover, then this is a discovery ping. // these are really empty searches with must-reply set. // So if !discover, then we should not be modifying any internal state - { + // + // If kind == SearchKind::initial we are sending the first search request + // for the channels in initalSearchBucket, and not resending requests for + // channels in the searchBuckets. + // + // If kind == SearchKind::check then we may have been poked. + if (kind == SearchKind::check) { Guard G(pokeLock); poked = false; + } else if (kind == SearchKind::initial) { + initialSearchScheduled = false; } auto idx = currentBucket; - if(!discover) + if(kind == SearchKind::check) currentBucket = (currentBucket+1u)%searchBuckets.size(); log_debug_printf(io, "Search tick %zu\n", idx); decltype (searchBuckets)::value_type bucket; - if(!discover) + if (kind == SearchKind::initial) { + initialSearchBucket.swap(bucket); + } else if(kind == SearchKind::check) { searchBuckets[idx].swap(bucket); + } - while(!bucket.empty() || discover) { + while(!bucket.empty() || kind == SearchKind::discover) { // when 'discover' we only loop once searchMsg.resize(0x10000); @@ -991,7 +1015,8 @@ void ContextImpl::tickSearch(bool discover) // flags and reserved. // initially flags[7] is cleared (bcast) auto pflags = M.save(); - to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search + to_wire(M, uint8_t(kind == SearchKind::discover ? + pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search to_wire(M, uint8_t(0u)); to_wire(M, uint16_t(0u)); @@ -1004,7 +1029,7 @@ void ContextImpl::tickSearch(bool discover) auto pport = M.save(); to_wire(M, uint16_t(searchRxPort)); - if(discover) { + if(kind == SearchKind::discover) { to_wire(M, uint8_t(0u)); } else { @@ -1019,7 +1044,7 @@ void ContextImpl::tickSearch(bool discover) bool payload = false; while(!bucket.empty()) { - assert(!discover); + assert(kind != SearchKind::discover); auto chan = bucket.front().lock(); if(!chan || chan->state!=Channel::Searching) { @@ -1076,7 +1101,7 @@ void ContextImpl::tickSearch(bool discover) } assert(M.good()); - if(!payload && !discover) + if(!payload && kind != SearchKind::discover) break; { @@ -1144,18 +1169,18 @@ void ContextImpl::tickSearch(bool discover) // fail silently, will retry } - if(discover) + if(kind == SearchKind::discover) break; } - if(event_add(searchTimer.get(), &bucketInterval)) + if(kind != SearchKind::initial && event_add(searchTimer.get(), &bucketInterval)) log_err_printf(setup, "Error re-enabling search timer on\n%s", ""); } void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickSearch(false); + static_cast(raw)->tickSearch(SearchKind::check); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what()); } diff --git a/src/clientdiscover.cpp b/src/clientdiscover.cpp index c128dac..900bd6a 100644 --- a/src/clientdiscover.cpp +++ b/src/clientdiscover.cpp @@ -93,7 +93,7 @@ std::shared_ptr DiscoverBuilder::exec() if(first && ping) { log_debug_printf(setup, "Starting Discover%s", "\n"); - context->tickSearch(true); + context->tickSearch(ContextImpl::SearchKind::discover); } }); diff --git a/src/clientimpl.h b/src/clientimpl.h index 802fb99..8a0506a 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -271,6 +271,10 @@ struct ContextImpl : public std::enable_shared_from_this epicsTimeStamp lastPoke{}; bool poked = false; + // unlike `poke`, `scheduleInitialSearch` is only ever called from the + // tcp_loop so this does not need to be guarded by a mutex + bool initialSearchScheduled = false; + // map: endpoint+proto -> Beaconer typedef std::pair BeaconServer; struct BeaconInfo { @@ -287,6 +291,9 @@ struct ContextImpl : public std::enable_shared_from_this std::vector> searchDest; size_t currentBucket = 0u; + // Channels where we have yet to send out an initial search request + std::list> initialSearchBucket; + // Channels where we are waiting for a search response std::vector>> searchBuckets; std::list > beaconRx; @@ -330,9 +337,12 @@ struct ContextImpl : public std::enable_shared_from_this void onBeacon(const UDPManager::Beacon& msg); + void scheduleInitialSearch(); + bool onSearch(evutil_socket_t fd); static void onSearchS(evutil_socket_t fd, short evt, void *raw); - void tickSearch(bool discover); + enum class SearchKind { discover, initial, check }; + void tickSearch(SearchKind kind); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);