From 609768a33dbbcb2503ace6a0c93adcbb480791bc Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 7 May 2020 18:44:41 -0700 Subject: [PATCH] add client channel cache Maintain unused channels for 20 seconds before closing. --- src/client.cpp | 65 ++++++++++++++++++++++++++++++++++++++++++++--- src/clientimpl.h | 9 ++++++- src/pvxs/client.h | 4 +++ test/testget.cpp | 3 +++ test/testinfo.cpp | 3 +++ test/testput.cpp | 3 +++ 6 files changed, 83 insertions(+), 4 deletions(-) diff --git a/src/client.cpp b/src/client.cpp index 3e4ec34..706cbb3 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -31,6 +31,8 @@ constexpr size_t nBuckets = 30u; constexpr size_t maxSearchPayload = 0x4000; +constexpr timeval channelCacheCleanInterval{10,0}; + constexpr timeval beaconCleanInterval{2*180, 0}; Disconnect::Disconnect() @@ -75,7 +77,6 @@ Channel::Channel(const std::shared_ptr& context, const std::string Channel::~Channel() { context->chanByCID.erase(cid); - context->chanByName.erase(name); // searchBuckets cleaned in tickSearch() if((state==Creating || state==Active) && conn && conn->bev) { { @@ -191,7 +192,8 @@ std::shared_ptr Channel::build(const std::shared_ptr& con auto it = context->chanByName.find(name); if(it!=context->chanByName.end()) { - chan = it->second.lock(); + chan = it->second; + chan->garbage = false; } if(!chan) { @@ -266,6 +268,18 @@ void Context::hurryUp() }); } +void Context::cacheClear() +{ + if(!pvt) + throw std::logic_error("NULL Context"); + + pvt->tcp_loop.call([this](){ + // run twice to ensure both mark and sweep of all unused channels + pvt->cacheClean(); + pvt->cacheClean(); + }); +} + static Value buildCAMethod() { @@ -286,6 +300,7 @@ Context::Pvt::Pvt(const Config& conf) ,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this)) ,manager(UDPManager::instance()) ,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::tickBeaconCleanS, this)) + ,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::cacheCleanS, this)) { effective.expand(); @@ -362,6 +377,8 @@ Context::Pvt::Pvt(const Config& conf) log_err_printf(setup, "Error enabling search RX\n%s", ""); if(event_add(beaconCleaner.get(), &beaconCleanInterval)) log_err_printf(setup, "Error enabling beacon clean timer on\n%s", ""); + if(event_add(cacheCleaner.get(), &channelCacheCleanInterval)) + log_err_printf(setup, "Error enabling channel cache clean timer on\n%s", ""); } Context::Pvt::~Pvt() {} @@ -373,8 +390,11 @@ void Context::Pvt::close() (void)event_del(searchTimer.get()); (void)event_del(searchRx.get()); (void)event_del(beaconCleaner.get()); + (void)event_del(cacheCleaner.get()); - decltype (connByAddr) conns(std::move(connByAddr)); + auto conns(std::move(connByAddr)); + // explicitly break ref. loop of channel cache + auto chans(std::move(chanByName)); for(auto& pair : conns) { auto conn = pair.second.lock(); @@ -383,6 +403,11 @@ void Context::Pvt::close() conn->cleanup(); } + + conns.clear(); + chans.clear(); + + assert(internal_self.use_count()==1); }); tcp_loop.join(); @@ -739,6 +764,40 @@ void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw) } } +void Context::Pvt::cacheClean() +{ + std::set trash; + + for(auto& pair : chanByName) { + if(pair.second.use_count()<=1) { + if(!pair.second->garbage) { + // mark for next sweep + log_debug_printf(setup, "Chan GC mark '%s'\n", pair.first.c_str()); + pair.second->garbage = true; + + } else { + // sweep + trash.insert(pair.first); + } + } + } + + // explicitly break ref. loop of channel cache + for(auto& name : trash) { + chanByName.erase(name); + log_debug_printf(setup, "Chan GC sweep '%s'\n", name.c_str()); + } +} + +void Context::Pvt::cacheCleanS(evutil_socket_t fd, short evt, void *raw) +{ + try { + static_cast(raw)->tickBeaconClean(); + }catch(std::exception& e){ + log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what()); + } +} + } // namespace client } // namespace pvxs diff --git a/src/clientimpl.h b/src/clientimpl.h index 821eb45..cb81083 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -135,6 +135,8 @@ struct Channel { Active, } state = Searching; + bool garbage = false; + std::shared_ptr conn; uint32_t sid = 0u; @@ -193,7 +195,9 @@ struct Context::Pvt std::list > beaconRx; std::map> chanByCID; - std::map> chanByName; + // strong ref. loop through Channel::context + // explicitly broken by Context::close(), Context::cacheClear, or Context::Pvt::cacheClean() + std::map> chanByName; std::map> connByAddr; @@ -212,6 +216,7 @@ struct Context::Pvt UDPManager manager; const evevent beaconCleaner; + const evevent cacheCleaner; INST_COUNTER(ClientPvt); @@ -230,6 +235,8 @@ struct Context::Pvt static void tickSearchS(evutil_socket_t fd, short evt, void *raw); void tickBeaconClean(); static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw); + void cacheClean(); + static void cacheCleanS(evutil_socket_t fd, short evt, void *raw); }; } // namespace client diff --git a/src/pvxs/client.h b/src/pvxs/client.h index a8cb464..25ee6ee 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -358,6 +358,10 @@ public: */ void hurryUp(); + /** Immediately close unused channels and connections. + */ + void cacheClear(); + explicit operator bool() const { return pvt.operator bool(); } size_t use_count() const { return pvt.use_count(); } private: diff --git a/test/testget.cpp b/test/testget.cpp index 4f1b0dc..0fff359 100644 --- a/test/testget.cpp +++ b/test/testget.cpp @@ -84,6 +84,9 @@ struct Tester { } else { testSkip(1, "timeout"); } + + op.reset(); + cli.cacheClear(); } void loopback() diff --git a/test/testinfo.cpp b/test/testinfo.cpp index fef0e4b..e82791d 100644 --- a/test/testinfo.cpp +++ b/test/testinfo.cpp @@ -60,6 +60,9 @@ struct Tester { } else { testSkip(1, "timeout"); } + + op.reset(); + cli.cacheClear(); } void loopback() diff --git a/test/testput.cpp b/test/testput.cpp index 81dc568..2fb7c89 100644 --- a/test/testput.cpp +++ b/test/testput.cpp @@ -85,6 +85,9 @@ struct Tester : public TesterBase } else { testSkip(2, "timeout"); } + + op.reset(); + cli.cacheClear(); } void loopback(bool get)