fix: do not re-search for other channels when doing initial channel search

When creating a large number of Channels at once, we can end up calling
`ContextImpl::poke(true)` many times in quick succession.  This results
in a flood of UDP broadcasts where we are searching for channels that we
only just sent out the initial search request for.

This can easily lead to packets getting lost and us not receiving a
reply for some Channels.  Moreover, as we keep resending search requests
for Channels, we reschedule them further and further in the future (as
`nSearch` is increased).  After the dust settles and we stop poking,
this can result in a wait of several seconds before a Channel which we
have not found is searched for again.

In this commit we avoid this issue by using a separate bucket to hold
channels waiting for their initial search request.  Rather than poking
`tickSearch` to do the initial search and also resend requests for
outstanding channels, we schedule a call to new call `tickSearch` which
will only send the initial search requests.  As such, we will avoid
rebroadcasting search requests for channels we have only just searched
for.

We have prompted the `discover` bool to an enum to distinguish between
the now three different situations `tickSearch` can be called in.
This commit is contained in:
Thomas Ives
2023-04-03 09:48:49 +01:00
committed by Michael Davidsaver
parent ad9ba0f085
commit 7ae659678f
3 changed files with 52 additions and 17 deletions
+40 -15
View File
@@ -340,9 +340,9 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<ContextImpl>& cont
context->chanByName[namekey] = chan;
if(server.empty()) {
context->searchBuckets[context->currentBucket].push_back(chan);
context->initialSearchBucket.push_back(chan);
context->poke(true);
context->scheduleInitialSearch();
} else { // bypass search and connect so a specific server
chan->forcedServer = forceServer;
@@ -694,6 +694,19 @@ void ContextImpl::poke(bool force)
throw std::runtime_error("Unable to schedule searchTimer");
}
void ContextImpl::scheduleInitialSearch()
{
if (!initialSearchScheduled)
{
log_debug_printf(setup, "scheduleInitialSearch()%s\n", "");
initialSearchScheduled = true;
tcp_loop.dispatch([this]() {
tickSearch(SearchKind::initial);
});
}
}
void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
{
epicsTimeStamp now;
@@ -958,27 +971,38 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
}
}
void ContextImpl::tickSearch(bool discover)
void ContextImpl::tickSearch(SearchKind kind)
{
// If !discover, then this is a discovery ping.
// If kind == SearchKind::discover, then this is a discovery ping.
// these are really empty searches with must-reply set.
// So if !discover, then we should not be modifying any internal state
{
//
// If kind == SearchKind::initial we are sending the first search request
// for the channels in initalSearchBucket, and not resending requests for
// channels in the searchBuckets.
//
// If kind == SearchKind::check then we may have been poked.
if (kind == SearchKind::check) {
Guard G(pokeLock);
poked = false;
} else if (kind == SearchKind::initial) {
initialSearchScheduled = false;
}
auto idx = currentBucket;
if(!discover)
if(kind == SearchKind::check)
currentBucket = (currentBucket+1u)%searchBuckets.size();
log_debug_printf(io, "Search tick %zu\n", idx);
decltype (searchBuckets)::value_type bucket;
if(!discover)
if (kind == SearchKind::initial) {
initialSearchBucket.swap(bucket);
} else if(kind == SearchKind::check) {
searchBuckets[idx].swap(bucket);
}
while(!bucket.empty() || discover) {
while(!bucket.empty() || kind == SearchKind::discover) {
// when 'discover' we only loop once
searchMsg.resize(0x10000);
@@ -991,7 +1015,8 @@ void ContextImpl::tickSearch(bool discover)
// flags and reserved.
// initially flags[7] is cleared (bcast)
auto pflags = M.save();
to_wire(M, uint8_t(discover ? pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(kind == SearchKind::discover ?
pva_search_flags::MustReply : 0u)); // must-reply to discovery, ignore regular negative search
to_wire(M, uint8_t(0u));
to_wire(M, uint16_t(0u));
@@ -1004,7 +1029,7 @@ void ContextImpl::tickSearch(bool discover)
auto pport = M.save();
to_wire(M, uint16_t(searchRxPort));
if(discover) {
if(kind == SearchKind::discover) {
to_wire(M, uint8_t(0u));
} else {
@@ -1019,7 +1044,7 @@ void ContextImpl::tickSearch(bool discover)
bool payload = false;
while(!bucket.empty()) {
assert(!discover);
assert(kind != SearchKind::discover);
auto chan = bucket.front().lock();
if(!chan || chan->state!=Channel::Searching) {
@@ -1076,7 +1101,7 @@ void ContextImpl::tickSearch(bool discover)
}
assert(M.good());
if(!payload && !discover)
if(!payload && kind != SearchKind::discover)
break;
{
@@ -1144,18 +1169,18 @@ void ContextImpl::tickSearch(bool discover)
// fail silently, will retry
}
if(discover)
if(kind == SearchKind::discover)
break;
}
if(event_add(searchTimer.get(), &bucketInterval))
if(kind != SearchKind::initial && event_add(searchTimer.get(), &bucketInterval))
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
}
void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
{
try {
static_cast<ContextImpl*>(raw)->tickSearch(false);
static_cast<ContextImpl*>(raw)->tickSearch(SearchKind::check);
}catch(std::exception& e){
log_exc_printf(io, "Unhandled error in search timer callback: %s\n", e.what());
}
+1 -1
View File
@@ -93,7 +93,7 @@ std::shared_ptr<Operation> DiscoverBuilder::exec()
if(first && ping) {
log_debug_printf(setup, "Starting Discover%s", "\n");
context->tickSearch(true);
context->tickSearch(ContextImpl::SearchKind::discover);
}
});
+11 -1
View File
@@ -271,6 +271,10 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
epicsTimeStamp lastPoke{};
bool poked = false;
// unlike `poke`, `scheduleInitialSearch` is only ever called from the
// tcp_loop so this does not need to be guarded by a mutex
bool initialSearchScheduled = false;
// map: endpoint+proto -> Beaconer
typedef std::pair<SockAddr, std::string> BeaconServer;
struct BeaconInfo {
@@ -287,6 +291,9 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
std::vector<std::pair<SockEndpoint, bool>> searchDest;
size_t currentBucket = 0u;
// Channels where we have yet to send out an initial search request
std::list<std::weak_ptr<Channel>> initialSearchBucket;
// Channels where we are waiting for a search response
std::vector<std::list<std::weak_ptr<Channel>>> searchBuckets;
std::list<std::unique_ptr<UDPListener> > beaconRx;
@@ -330,9 +337,12 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
void onBeacon(const UDPManager::Beacon& msg);
void scheduleInitialSearch();
bool onSearch(evutil_socket_t fd);
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
void tickSearch(bool discover);
enum class SearchKind { discover, initial, check };
void tickSearch(SearchKind kind);
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
void tickBeaconClean();
static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);