Client subscription add batch pop() and stats()

This commit is contained in:
Michael Davidsaver
2021-09-21 08:08:53 -07:00
parent a4c6540c97
commit febc823301
3 changed files with 126 additions and 24 deletions
+72 -9
View File
@@ -66,6 +66,9 @@ struct SubscriptionImpl : public OperationBase, public Subscription
std::deque<Entry> queue;
uint32_t window =0u; // flow control window. number of updates server may send to us
uint32_t unack =0u; // updates pop()'d, but not ack'd
size_t nSrvSquash =0u;
size_t nCliSquash =0u;
size_t queueMax =0u;
// user code has seen pop()==nullptr
bool needNotify = true;
bool ackPending = false; // ackTick scheduled
@@ -140,14 +143,27 @@ struct SubscriptionImpl : public OperationBase, public Subscription
});
}
virtual Value pop() override final
{
Value ret;
{
Guard G(lock);
virtual void stats(SubscriptionStat& ret, bool reset) override final {
Guard G(lock);
ret.limitQueue = queueSize;
ret.maxQueue = queueMax;
ret.nSrvSquash = nSrvSquash;
ret.nCliSquash = nCliSquash;
ret.nQueue = queue.size();
if(reset) {
nSrvSquash = nCliSquash = queueMax = 0u;
}
}
void _pop(Value& ret, bool canthrow)
{
{
if(!queue.empty()) {
auto ent(queue.front());
auto ent(std::move(queue.front()));
if(!canthrow && ent.exc)
return;
queue.pop_front();
if(pipeline) {
@@ -170,7 +186,6 @@ struct SubscriptionImpl : public OperationBase, public Subscription
unack++;
}
log_info_printf(monevt, "channel '%s' monitor pop() %s %u,%u\n",
channelName.c_str(),
ent.exc ? "exception" : ent.val ? "data" : "null!",
@@ -188,9 +203,42 @@ struct SubscriptionImpl : public OperationBase, public Subscription
channelName.c_str());
}
}
}
virtual Value pop() override final
{
Value ret;
{
Guard G(lock);
_pop(ret, true);
}
return ret;
}
virtual bool doPop(std::vector<Value>& out, size_t limit) override final
{
out.clear();
if(!limit) {
limit = queueSize; // alloc for worst case
}
out.reserve(limit);
Guard G(lock);
while(out.size() < limit) {
Value temp;
_pop(temp, out.empty()); // only throw if out is empty
if(!temp)
break;
out.emplace_back(std::move(temp));
}
return !needNotify;
}
virtual std::shared_ptr<Subscription> shared_from_this() const override final {
// on worker?
std::shared_ptr<Subscription> ret;
@@ -436,6 +484,7 @@ void Connection::handle_MONITOR()
from_wire_type(M, rxRegistry, data);
RequestInfo* info=nullptr;
bool servSquash = false;
if(M.good()) {
auto it = opByIOID.find(ioid);
if(it!=opByIOID.end()) {
@@ -463,7 +512,15 @@ void Connection::handle_MONITOR()
BitMask overrun;
from_wire(M, overrun);
(void)overrun; // ignoring
for(auto i : range(overrun.wsize())) {
(void)i;
if(overrun.word(i)) {
// this update Value is the result of combining
// two or more Values on the server side.
servSquash = true;
break;
}
}
}
}
@@ -589,6 +646,7 @@ void Connection::handle_MONITOR()
mon->chan->name.c_str());
mon->queue.back().val.assign(update.val);
mon->nCliSquash++;
}
if(final && !update.exc) {
@@ -604,11 +662,16 @@ void Connection::handle_MONITOR()
peerName.c_str(), mon->chan->name.c_str());
notify = false;
}
if(mon->queueMax < mon->queue.size())
mon->queueMax = mon->queue.size();
}
if(notify)
notify = mon->wantToNotify();
}
if(servSquash)
mon->nSrvSquash++;
} // release mon->lock
if(mon->state==SubscriptionImpl::Done || final) {
mon->state=SubscriptionImpl::Done;