From 4141775c716ecfdf30780ae8d16026d601e4f673 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Thu, 17 Nov 2022 11:47:36 -0800 Subject: [PATCH] client: add non-intrusive free-list for subscription queue --- src/clientimpl.h | 9 +++++++++ src/clientmon.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++++-- src/dataimpl.h | 2 ++ 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/clientimpl.h b/src/clientimpl.h index 485b303..f6b40d5 100644 --- a/src/clientimpl.h +++ b/src/clientimpl.h @@ -62,12 +62,21 @@ struct OperationBase : public Operation virtual void interrupt() override final; }; +struct RequestFL { + const size_t limit; + epicsMutex lock; + std::vector unused; + + explicit RequestFL(size_t limit) :limit(limit) {} +}; + struct RequestInfo { const uint32_t sid, ioid; const Operation::operation_t op; const std::weak_ptr handle; Value prototype; + std::shared_ptr fl; RequestInfo(uint32_t sid, uint32_t ioid, std::shared_ptr& handle); }; diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 18e82e4..eb76be6 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -504,10 +504,49 @@ void Connection::handle_MONITOR() } else if(init) { info->prototype = std::move(data); + // initialize info->fl later, with access to queueSize } else if(!final || !M.empty()) { - data = info->prototype.cloneEmpty(); + // Take from free-list of pre-allocated Value + Value raw; + { + Guard G(info->fl->lock); + + if(!info->fl->unused.empty()) { + raw = std::move(info->fl->unused.back()); + info->fl->unused.pop_back(); + + } else { + raw = info->prototype.cloneEmpty(); + } + } + // Wrap Value for automatic return to our free-list + { + std::weak_ptr wfl(info->fl); + auto desc(Value::Helper::desc(raw)); + auto store(Value::Helper::store_ptr(raw)); + + Value::Helper::store(data).reset( + store, + // ugly bind() to capture by move instead of copy to avoid extra ref-counts + std::bind( + [](FieldStorage*, Value& data, std::weak_ptr& wfl) mutable { + // maybe on worker or user thread + auto real(std::move(data)); + if(auto fl = wfl.lock()) { + Guard G(fl->lock); + if(fl->unused.size() < fl->limit) { + real.clear(); + fl->unused.emplace_back(std::move(real)); + } + } + + }, std::placeholders::_1, std::move(raw), std::move(wfl)) + ); + + Value::Helper::set_desc(data, desc); + } from_wire_valid(M, rxRegistry, data); BitMask overrun; @@ -615,7 +654,13 @@ void Connection::handle_MONITOR() mon->queue.emplace_back(std::move(update)); notify = true; - } else if(!init) { + } else if(init) { + /* Allow enough for user to hold/process one full queue while + * accumulate another. + */ + info->fl = std::make_shared(2u*mon->queueSize); + + } else { if(mon->pipeline) { if(mon->window) { diff --git a/src/dataimpl.h b/src/dataimpl.h index cc7c93b..bf09401 100644 --- a/src/dataimpl.h +++ b/src/dataimpl.h @@ -27,11 +27,13 @@ struct Value::Helper { static inline Value build(const std::shared_ptr& desc, const std::shared_ptr& pstore, const impl::FieldDesc* pdesc); + static Value build(const void* ptr, StoreType type); static inline std::shared_ptr& store( Value& v) { return v.store; } static inline std::shared_ptr store(const Value& v) { return v.store; } static constexpr const FieldDesc* desc(const Value& v) { return v.desc; } + static inline void set_desc(Value& v, const FieldDesc* desc) { v.desc = desc; } static inline impl::FieldStorage* store_ptr( Value& v) { return v.store.get(); } static inline const impl::FieldStorage* store_ptr(const Value& v) { return v.store.get(); }