client: re-define the meaning of poke()
This commit is contained in:
+37
-13
@@ -27,25 +27,45 @@ typedef epicsGuardRelease<epicsMutex> UnGuard;
|
||||
namespace pvxs {
|
||||
namespace client {
|
||||
|
||||
namespace {
|
||||
/* "normal" tick interval for the search bucket ring, and "fast" interval
|
||||
* used for one revolution after a successful poke().
|
||||
*/
|
||||
constexpr timeval bucketInterval{1,0};
|
||||
constexpr timeval bucketIntervalFast{0,200000};
|
||||
// coalescence time for first search for a batch of newly created Channels
|
||||
constexpr timeval initialSearchDelay{0, 10000}; // 10 ms
|
||||
// number of buckets in the search ring
|
||||
constexpr size_t nBuckets = 30u;
|
||||
|
||||
// try not to fragment with usual MTU==1500
|
||||
/* our limit for UDP packet payload.
|
||||
* try not to fragment with usual MTU==1500 allowing for some overhead
|
||||
* by transport protocols. Ethernet+ip+udp headers add >= 42 bytes.
|
||||
* May be more with eg. IP header options, VLAN tag, etc.
|
||||
*/
|
||||
constexpr size_t maxSearchPayload = 1400;
|
||||
|
||||
/* Interval between checks for Channels which are no longer used by any operation.
|
||||
* Channels will be discarded if found to be unused by two consecutive checks.
|
||||
*/
|
||||
constexpr timeval channelCacheCleanInterval{10,0};
|
||||
|
||||
// time to wait before allowing another hurryUp().
|
||||
constexpr double pokeHoldoff = 30.0;
|
||||
|
||||
// limit on the number of GUIDs * protocols * addresses we will track
|
||||
constexpr size_t beaconTrackLimit{20000};
|
||||
|
||||
// interval between checks to discard servers which have stopped sending beacons
|
||||
constexpr timeval beaconCleanInterval{180, 0};
|
||||
|
||||
// special interval to attempt to reconnect to disconnected name servers
|
||||
constexpr timeval tcpNSCheckInterval{10, 0};
|
||||
|
||||
// searchSequenceID in CMD_SEARCH is redundant.
|
||||
// So we use a static value and instead rely on IDs for individual PVs
|
||||
constexpr uint32_t search_seq{0x66696e64}; // "find"
|
||||
} // namespace
|
||||
|
||||
Disconnect::Disconnect()
|
||||
:std::runtime_error("Disconnected")
|
||||
@@ -399,7 +419,7 @@ void Context::hurryUp()
|
||||
throw std::logic_error("NULL Context");
|
||||
|
||||
pvt->impl->manager.loop().call([this](){
|
||||
pvt->impl->poke(true);
|
||||
pvt->impl->poke();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -672,22 +692,22 @@ void ContextImpl::close()
|
||||
manager.sync();
|
||||
}
|
||||
|
||||
void ContextImpl::poke(bool force)
|
||||
void ContextImpl::poke()
|
||||
{
|
||||
{
|
||||
Guard G(pokeLock);
|
||||
if(poked)
|
||||
if(nPoked)
|
||||
return;
|
||||
|
||||
epicsTimeStamp now{};
|
||||
|
||||
double age = -1.0;
|
||||
if(!force && (epicsTimeGetCurrent(&now) || (age=epicsTimeDiffInSeconds(&now, &lastPoke))<30.0)) {
|
||||
if(epicsTimeGetCurrent(&now) || (age=epicsTimeDiffInSeconds(&now, &lastPoke))<pokeHoldoff) {
|
||||
log_debug_printf(setup, "Ignoring hurryUp() age=%.1f sec\n", age);
|
||||
return;
|
||||
}
|
||||
lastPoke = now;
|
||||
poked = true;
|
||||
nPoked = nBuckets;
|
||||
}
|
||||
|
||||
log_debug_printf(setup, "hurryUp()%s\n", "");
|
||||
@@ -782,7 +802,7 @@ void ContextImpl::onBeacon(const UDPManager::Beacon& msg)
|
||||
now
|
||||
});
|
||||
|
||||
poke(false);
|
||||
poke();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -973,7 +993,7 @@ void ContextImpl::onSearchS(evutil_socket_t fd, short evt, void *raw)
|
||||
}
|
||||
}
|
||||
|
||||
void ContextImpl::tickSearch(SearchKind kind)
|
||||
void ContextImpl::tickSearch(SearchKind kind, bool poked)
|
||||
{
|
||||
// If kind == SearchKind::discover, then this is a discovery ping.
|
||||
// these are really empty searches with must-reply set.
|
||||
@@ -1074,7 +1094,7 @@ void ContextImpl::tickSearch(SearchKind kind)
|
||||
count++;
|
||||
|
||||
size_t ninc = 0u;
|
||||
if(kind==SearchKind::check)
|
||||
if(kind==SearchKind::check && !poked)
|
||||
ninc = chan->nSearch = std::min(searchBuckets.size(), chan->nSearch+1u);
|
||||
auto next = (idx + ninc)%searchBuckets.size();
|
||||
auto nextnext = (next + 1u)%searchBuckets.size();
|
||||
@@ -1174,14 +1194,18 @@ void ContextImpl::tickSearchS(evutil_socket_t fd, short evt, void *raw)
|
||||
{
|
||||
auto self(static_cast<ContextImpl*>(raw));
|
||||
try {
|
||||
bool poke = false;
|
||||
{
|
||||
Guard G(self->pokeLock);
|
||||
self->poked = false;
|
||||
if(self->nPoked) {
|
||||
poke = true;
|
||||
self->nPoked--;
|
||||
}
|
||||
}
|
||||
|
||||
self->tickSearch(SearchKind::check);
|
||||
self->tickSearch(SearchKind::check, poke);
|
||||
|
||||
if(event_add(self->searchTimer.get(), &bucketInterval))
|
||||
if(event_add(self->searchTimer.get(), poke ? &bucketIntervalFast : &bucketInterval))
|
||||
log_err_printf(setup, "Error re-enabling search timer on\n%s", "");
|
||||
|
||||
}catch(std::exception& e){
|
||||
@@ -1194,7 +1218,7 @@ void ContextImpl::initialSearchS(evutil_socket_t fd, short evt, void *raw)
|
||||
auto self(static_cast<ContextImpl*>(raw));
|
||||
try {
|
||||
self->initialSearchScheduled = false;
|
||||
self->tickSearch(SearchKind::initial);
|
||||
self->tickSearch(SearchKind::initial, false);
|
||||
}catch(std::exception& e){
|
||||
log_exc_printf(io, "Unhandled error in initial search callback: %s\n", e.what());
|
||||
}
|
||||
|
||||
+1
-1
@@ -310,7 +310,7 @@ void Connection::handle_CONNECTION_VALIDATED()
|
||||
|
||||
if(nameserver) {
|
||||
log_info_printf(io, "(re)connected to nameserver %s\n", peerName.c_str());
|
||||
context->poke(true);
|
||||
context->poke();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ std::shared_ptr<Operation> DiscoverBuilder::exec()
|
||||
if(first && ping) {
|
||||
log_debug_printf(setup, "Starting Discover%s", "\n");
|
||||
|
||||
context->tickSearch(ContextImpl::SearchKind::discover);
|
||||
context->tickSearch(ContextImpl::SearchKind::discover, false);
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
+3
-3
@@ -269,7 +269,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
// poked and beaconSenders from both TCP and UDP workers
|
||||
epicsMutex pokeLock;
|
||||
epicsTimeStamp lastPoke{};
|
||||
bool poked = false;
|
||||
size_t nPoked = 0u;
|
||||
|
||||
// unlike `poke`, `scheduleInitialSearch` is only ever called from the
|
||||
// tcp_loop so this does not need to be guarded by a mutex
|
||||
@@ -332,7 +332,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
|
||||
void close();
|
||||
|
||||
void poke(bool force);
|
||||
void poke();
|
||||
|
||||
void serverEvent(const Discovered &evt);
|
||||
|
||||
@@ -343,7 +343,7 @@ struct ContextImpl : public std::enable_shared_from_this<ContextImpl>
|
||||
bool onSearch(evutil_socket_t fd);
|
||||
static void onSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
enum class SearchKind { discover, initial, check };
|
||||
void tickSearch(SearchKind kind);
|
||||
void tickSearch(SearchKind kind, bool poked);
|
||||
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
static void initialSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
void tickBeaconClean();
|
||||
|
||||
Reference in New Issue
Block a user