add client channel cache
Maintain unused channels for 20 seconds before closing.
This commit is contained in:
+62
-3
@@ -31,6 +31,8 @@ constexpr size_t nBuckets = 30u;
|
||||
|
||||
constexpr size_t maxSearchPayload = 0x4000;
|
||||
|
||||
constexpr timeval channelCacheCleanInterval{10,0};
|
||||
|
||||
constexpr timeval beaconCleanInterval{2*180, 0};
|
||||
|
||||
Disconnect::Disconnect()
|
||||
@@ -75,7 +77,6 @@ Channel::Channel(const std::shared_ptr<Context::Pvt>& context, const std::string
|
||||
Channel::~Channel()
|
||||
{
|
||||
context->chanByCID.erase(cid);
|
||||
context->chanByName.erase(name);
|
||||
// searchBuckets cleaned in tickSearch()
|
||||
if((state==Creating || state==Active) && conn && conn->bev) {
|
||||
{
|
||||
@@ -191,7 +192,8 @@ std::shared_ptr<Channel> Channel::build(const std::shared_ptr<Context::Pvt>& con
|
||||
|
||||
auto it = context->chanByName.find(name);
|
||||
if(it!=context->chanByName.end()) {
|
||||
chan = it->second.lock();
|
||||
chan = it->second;
|
||||
chan->garbage = false;
|
||||
}
|
||||
|
||||
if(!chan) {
|
||||
@@ -266,6 +268,18 @@ void Context::hurryUp()
|
||||
});
|
||||
}
|
||||
|
||||
void Context::cacheClear()
|
||||
{
|
||||
if(!pvt)
|
||||
throw std::logic_error("NULL Context");
|
||||
|
||||
pvt->tcp_loop.call([this](){
|
||||
// run twice to ensure both mark and sweep of all unused channels
|
||||
pvt->cacheClean();
|
||||
pvt->cacheClean();
|
||||
});
|
||||
}
|
||||
|
||||
static
|
||||
Value buildCAMethod()
|
||||
{
|
||||
@@ -286,6 +300,7 @@ Context::Pvt::Pvt(const Config& conf)
|
||||
,searchTimer(event_new(tcp_loop.base, -1, EV_TIMEOUT, &Pvt::tickSearchS, this))
|
||||
,manager(UDPManager::instance())
|
||||
,beaconCleaner(event_new(manager.loop().base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::tickBeaconCleanS, this))
|
||||
,cacheCleaner(event_new(tcp_loop.base, -1, EV_TIMEOUT|EV_PERSIST, &Pvt::cacheCleanS, this))
|
||||
{
|
||||
effective.expand();
|
||||
|
||||
@@ -362,6 +377,8 @@ Context::Pvt::Pvt(const Config& conf)
|
||||
log_err_printf(setup, "Error enabling search RX\n%s", "");
|
||||
if(event_add(beaconCleaner.get(), &beaconCleanInterval))
|
||||
log_err_printf(setup, "Error enabling beacon clean timer on\n%s", "");
|
||||
if(event_add(cacheCleaner.get(), &channelCacheCleanInterval))
|
||||
log_err_printf(setup, "Error enabling channel cache clean timer on\n%s", "");
|
||||
}
|
||||
|
||||
Context::Pvt::~Pvt() {}
|
||||
@@ -373,8 +390,11 @@ void Context::Pvt::close()
|
||||
(void)event_del(searchTimer.get());
|
||||
(void)event_del(searchRx.get());
|
||||
(void)event_del(beaconCleaner.get());
|
||||
(void)event_del(cacheCleaner.get());
|
||||
|
||||
decltype (connByAddr) conns(std::move(connByAddr));
|
||||
auto conns(std::move(connByAddr));
|
||||
// explicitly break ref. loop of channel cache
|
||||
auto chans(std::move(chanByName));
|
||||
|
||||
for(auto& pair : conns) {
|
||||
auto conn = pair.second.lock();
|
||||
@@ -383,6 +403,11 @@ void Context::Pvt::close()
|
||||
|
||||
conn->cleanup();
|
||||
}
|
||||
|
||||
conns.clear();
|
||||
chans.clear();
|
||||
|
||||
assert(internal_self.use_count()==1);
|
||||
});
|
||||
|
||||
tcp_loop.join();
|
||||
@@ -739,6 +764,40 @@ void Context::Pvt::tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw)
|
||||
}
|
||||
}
|
||||
|
||||
void Context::Pvt::cacheClean()
|
||||
{
|
||||
std::set<std::string> trash;
|
||||
|
||||
for(auto& pair : chanByName) {
|
||||
if(pair.second.use_count()<=1) {
|
||||
if(!pair.second->garbage) {
|
||||
// mark for next sweep
|
||||
log_debug_printf(setup, "Chan GC mark '%s'\n", pair.first.c_str());
|
||||
pair.second->garbage = true;
|
||||
|
||||
} else {
|
||||
// sweep
|
||||
trash.insert(pair.first);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// explicitly break ref. loop of channel cache
|
||||
for(auto& name : trash) {
|
||||
chanByName.erase(name);
|
||||
log_debug_printf(setup, "Chan GC sweep '%s'\n", name.c_str());
|
||||
}
|
||||
}
|
||||
|
||||
void Context::Pvt::cacheCleanS(evutil_socket_t fd, short evt, void *raw)
|
||||
{
|
||||
try {
|
||||
static_cast<Pvt*>(raw)->tickBeaconClean();
|
||||
}catch(std::exception& e){
|
||||
log_exc_printf(io, "Unhandled error in beacon cleaner timer callback: %s\n", e.what());
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace client
|
||||
|
||||
} // namespace pvxs
|
||||
|
||||
+8
-1
@@ -135,6 +135,8 @@ struct Channel {
|
||||
Active,
|
||||
} state = Searching;
|
||||
|
||||
bool garbage = false;
|
||||
|
||||
std::shared_ptr<Connection> conn;
|
||||
uint32_t sid = 0u;
|
||||
|
||||
@@ -193,7 +195,9 @@ struct Context::Pvt
|
||||
std::list<std::unique_ptr<UDPListener> > beaconRx;
|
||||
|
||||
std::map<uint32_t, std::weak_ptr<Channel>> chanByCID;
|
||||
std::map<std::string, std::weak_ptr<Channel>> chanByName;
|
||||
// strong ref. loop through Channel::context
|
||||
// explicitly broken by Context::close(), Context::cacheClear, or Context::Pvt::cacheClean()
|
||||
std::map<std::string, std::shared_ptr<Channel>> chanByName;
|
||||
|
||||
std::map<SockAddr, std::weak_ptr<Connection>> connByAddr;
|
||||
|
||||
@@ -212,6 +216,7 @@ struct Context::Pvt
|
||||
UDPManager manager;
|
||||
|
||||
const evevent beaconCleaner;
|
||||
const evevent cacheCleaner;
|
||||
|
||||
INST_COUNTER(ClientPvt);
|
||||
|
||||
@@ -230,6 +235,8 @@ struct Context::Pvt
|
||||
static void tickSearchS(evutil_socket_t fd, short evt, void *raw);
|
||||
void tickBeaconClean();
|
||||
static void tickBeaconCleanS(evutil_socket_t fd, short evt, void *raw);
|
||||
void cacheClean();
|
||||
static void cacheCleanS(evutil_socket_t fd, short evt, void *raw);
|
||||
};
|
||||
|
||||
} // namespace client
|
||||
|
||||
@@ -358,6 +358,10 @@ public:
|
||||
*/
|
||||
void hurryUp();
|
||||
|
||||
/** Immediately close unused channels and connections.
|
||||
*/
|
||||
void cacheClear();
|
||||
|
||||
explicit operator bool() const { return pvt.operator bool(); }
|
||||
size_t use_count() const { return pvt.use_count(); }
|
||||
private:
|
||||
|
||||
@@ -84,6 +84,9 @@ struct Tester {
|
||||
} else {
|
||||
testSkip(1, "timeout");
|
||||
}
|
||||
|
||||
op.reset();
|
||||
cli.cacheClear();
|
||||
}
|
||||
|
||||
void loopback()
|
||||
|
||||
@@ -60,6 +60,9 @@ struct Tester {
|
||||
} else {
|
||||
testSkip(1, "timeout");
|
||||
}
|
||||
|
||||
op.reset();
|
||||
cli.cacheClear();
|
||||
}
|
||||
|
||||
void loopback()
|
||||
|
||||
@@ -85,6 +85,9 @@ struct Tester : public TesterBase
|
||||
} else {
|
||||
testSkip(2, "timeout");
|
||||
}
|
||||
|
||||
op.reset();
|
||||
cli.cacheClear();
|
||||
}
|
||||
|
||||
void loopback(bool get)
|
||||
|
||||
Reference in New Issue
Block a user