diff --git a/src/client.cpp b/src/client.cpp index f89eb7c..58fe56a 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -340,9 +340,9 @@ std::shared_ptr Channel::build(const std::shared_ptr& cont context->chanByName[namekey] = chan; if(server.empty()) { - context->searchBuckets[context->currentBucket].push_back(chan); + context->initialSearchBucket.push_back(chan); - context->poke(true); + context->scheduleInitialSearch(); } else { // bypass search and connect so a specific server chan->forcedServer = forceServer; @@ -694,6 +694,19 @@ void ContextImpl::poke(bool force) throw std::runtime_error("Unable to schedule searchTimer"); } +void ContextImpl::scheduleInitialSearch() +{ + if (!initialSearchScheduled) + { + log_debug_printf(setup, "scheduleInitialSearch()%s\n", ""); + + initialSearchScheduled = true; + tcp_loop.dispatch([this]() { + tickSearch(SearchKind::initial); + }); + } +} + void ContextImpl::onBeacon(const UDPManager::Beacon& msg) { epicsTimeStamp now; @@ -958,27 +971,38 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw) } } -void ContextImpl::tickSearch(bool discover) +void ContextImpl::tickSearch(SearchKind kind) { - // If !discover, then this is a discovery ping. + // If kind == SearchKind::discover, then this is a discovery ping. // these are really empty searches with must-reply set. // So if !discover, then we should not be modifying any internal state - { + // + // If kind == SearchKind::initial we are sending the first search request + // for the channels in initalSearchBucket, and not resending requests for + // channels in the searchBuckets. + // + // If kind == SearchKind::check then we may have been poked. + if (kind == SearchKind::check) { Guard G(pokeLock); poked = false; + } else if (kind == SearchKind::initial) { + initialSearchScheduled = false; } auto idx = currentBucket; - if(!discover) + if(kind == SearchKind::check) currentBucket = (currentBucket+1u)%searchBuckets.size(); log_debug_printf(io, "Search tick %zu\n", idx); decltype (searchBuckets)::value_type bucket; - if(!discover) + if (kind == SearchKind::initial) { + initialSearchBucket.swap(bucket); + } else if(kind == SearchKind::check) { searchBuckets[idx].swap(bucket); + } - while(!bucket.empty() || discover) { + while(!bucket.empty() || kind == SearchKind::discover) { // when 'discover' we only loop once searchMsg.resize(0x10000); @@ -991,7 +1015,8 @@ void ContextImpl::tickSearch(bool discover) // flags and reserved. // initially flags[7] is cleared (bcast) auto pflags = M.save(); - to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search + to_wire(M, uint8_t(kind == SearchKind::discover ? + pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search to_wire(M, uint8_t(0u)); to_wire(M, uint16_t(0u)); @@ -1004,7 +1029,7 @@ void ContextImpl::tickSearch(bool discover) auto pport = M.save(); to_wire(M, uint16_t(searchRxPort)); - if(discover) { + if(kind == SearchKind::discover) { to_wire(M, uint8_t(0u)); } else { @@ -1019,7 +1044,7 @@ void ContextImpl::tickSearch(bool discover) bool payload = false; while(!bucket.empty()) { - assert(!discover); + assert(kind != SearchKind::discover); auto chan = bucket.front().lock(); if(!chan || chan->state!=Channel::Searching) { @@ -1076,7 +1101,7 @@ void ContextImpl::tickSearch(bool discover) } assert(M.good()); - if(!payload && !discover) + if(!payload && kind != SearchKind::discover) break; { @@ -1144,18 +1169,18 @@ void ContextImpl::tickSearch(bool discover) // fail silently, will retry } - if(discover) + if(kind == SearchKind::discover) break; } - if(event_add(searchTimer.get(), &bucketInterval)) + if(kind != SearchKind::initial && event_add(searchTimer.get(), &bucketInterval)) log_err_printf(setup, "Error re-enabling search timer on\n%s", ""); } void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw) { try { - static_cast(raw)->tickSearch(false); + static_cast(raw)->tickSearch(SearchKind::check); }catch(std::exception& e){ log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what()); } diff --git a/src/clientdiscover.cpp b/src/clientdiscover.cpp index c128dac..900bd6a 100644 --- a/src/clientdiscover.cpp +++ b/src/clientdiscover.cpp @@ -93,7 +93,7 @@ std::shared_ptr DiscoverBuilder::exec() if(first && ping) { log_debug_printf(setup, "Starting Discover%s", "\n"); - context->tickSearch(true); + context->tickSearch(ContextImpl::SearchKind::discover); } }); diff --git a/src/clientimpl.h b/src/clientimpl.h index 802fb99..8a0506a 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -271,6 +271,10 @@ struct ContextImpl : public std::enable_shared_from_this epicsTimeStamp lastPoke{}; bool poked = false; + // unlike `poke`, `scheduleInitialSearch` is only ever called from the + // tcp_loop so this does not need to be guarded by a mutex + bool initialSearchScheduled = false; + // map: endpoint+proto -> Beaconer typedef std::pair BeaconServer; struct BeaconInfo { @@ -287,6 +291,9 @@ struct ContextImpl : public std::enable_shared_from_this std::vector> searchDest; size_t currentBucket = 0u; + // Channels where we have yet to send out an initial search request + std::list> initialSearchBucket; + // Channels where we are waiting for a search response std::vector>> searchBuckets; std::list > beaconRx; @@ -330,9 +337,12 @@ struct ContextImpl : public std::enable_shared_from_this void onBeacon(const UDPManager::Beacon& msg); + void scheduleInitialSearch(); + bool onSearch(evutil_socket_t fd); static void onSearchS(evutil_socket_t fd, short evt, void *raw); - void tickSearch(bool discover); + enum class SearchKind { discover, initial, check }; + void tickSearch(SearchKind kind); static void tickSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);