client: add non-intrusive free-list for subscription queue

This commit is contained in:
Michael Davidsaver
2022-11-17 11:47:36 -08:00
parent 53f83b6429
commit 4141775c71
3 changed files with 58 additions and 2 deletions
+47 -2
View File
@@ -504,10 +504,49 @@ void Connection::handle_MONITOR()
} else if(init) {
info->prototype = std::move(data);
// initialize info->fl later, with access to queueSize
} else if(!final || !M.empty()) {
data = info->prototype.cloneEmpty();
// Take from free-list of pre-allocated Value
Value raw;
{
Guard G(info->fl->lock);
if(!info->fl->unused.empty()) {
raw = std::move(info->fl->unused.back());
info->fl->unused.pop_back();
} else {
raw = info->prototype.cloneEmpty();
}
}
// Wrap Value for automatic return to our free-list
{
std::weak_ptr<RequestFL> wfl(info->fl);
auto desc(Value::Helper::desc(raw));
auto store(Value::Helper::store_ptr(raw));
Value::Helper::store(data).reset(
store,
// ugly bind() to capture by move instead of copy to avoid extra ref-counts
std::bind(
[](FieldStorage*, Value& data, std::weak_ptr<RequestFL>& wfl) mutable {
// maybe on worker or user thread
auto real(std::move(data));
if(auto fl = wfl.lock()) {
Guard G(fl->lock);
if(fl->unused.size() < fl->limit) {
real.clear();
fl->unused.emplace_back(std::move(real));
}
}
}, std::placeholders::_1, std::move(raw), std::move(wfl))
);
Value::Helper::set_desc(data, desc);
}
from_wire_valid(M, rxRegistry, data);
BitMask overrun;
@@ -615,7 +654,13 @@ void Connection::handle_MONITOR()
mon->queue.emplace_back(std::move(update));
notify = true;
} else if(!init) {
} else if(init) {
/* Allow enough for user to hold/process one full queue while
* accumulate another.
*/
info->fl = std::make_shared<RequestFL>(2u*mon->queueSize);
} else {
if(mon->pipeline) {
if(mon->window) {