From febc8233014b415aa212258637f33f4b62a30fce Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Tue, 21 Sep 2021 08:08:53 -0700 Subject: [PATCH] Client subscription add batch pop() and stats() --- src/clientmon.cpp | 81 +++++++++++++++++++++++++++++++++++++++++------ src/pvxs/client.h | 59 ++++++++++++++++++++++++++++------ src/pvxs/source.h | 10 +++--- 3 files changed, 126 insertions(+), 24 deletions(-) diff --git a/src/clientmon.cpp b/src/clientmon.cpp index 80ba69b..18e82e4 100644 --- a/src/clientmon.cpp +++ b/src/clientmon.cpp @@ -66,6 +66,9 @@ struct SubscriptionImpl : public OperationBase, public Subscription std::deque queue; uint32_t window =0u; // flow control window. number of updates server may send to us uint32_t unack =0u; // updates pop()'d, but not ack'd + size_t nSrvSquash =0u; + size_t nCliSquash =0u; + size_t queueMax =0u; // user code has seen pop()==nullptr bool needNotify = true; bool ackPending = false; // ackTick scheduled @@ -140,14 +143,27 @@ struct SubscriptionImpl : public OperationBase, public Subscription }); } - virtual Value pop() override final - { - Value ret; - { - Guard G(lock); + virtual void stats(SubscriptionStat& ret, bool reset) override final { + Guard G(lock); + ret.limitQueue = queueSize; + ret.maxQueue = queueMax; + ret.nSrvSquash = nSrvSquash; + ret.nCliSquash = nCliSquash; + ret.nQueue = queue.size(); + if(reset) { + nSrvSquash = nCliSquash = queueMax = 0u; + } + } + void _pop(Value& ret, bool canthrow) + { + { if(!queue.empty()) { - auto ent(queue.front()); + auto ent(std::move(queue.front())); + + if(!canthrow && ent.exc) + return; + queue.pop_front(); if(pipeline) { @@ -170,7 +186,6 @@ struct SubscriptionImpl : public OperationBase, public Subscription unack++; } - log_info_printf(monevt, "channel '%s' monitor pop() %s %u,%u\n", channelName.c_str(), ent.exc ? "exception" : ent.val ? "data" : "null!", @@ -188,9 +203,42 @@ struct SubscriptionImpl : public OperationBase, public Subscription channelName.c_str()); } } + } + + virtual Value pop() override final + { + Value ret; + { + Guard G(lock); + _pop(ret, true); + } return ret; } + virtual bool doPop(std::vector& out, size_t limit) override final + { + out.clear(); + + if(!limit) { + limit = queueSize; // alloc for worst case + } + + out.reserve(limit); + + Guard G(lock); + + while(out.size() < limit) { + Value temp; + _pop(temp, out.empty()); // only throw if out is empty + if(!temp) + break; + + out.emplace_back(std::move(temp)); + } + + return !needNotify; + } + virtual std::shared_ptr shared_from_this() const override final { // on worker? std::shared_ptr ret; @@ -436,6 +484,7 @@ void Connection::handle_MONITOR() from_wire_type(M, rxRegistry, data); RequestInfo* info=nullptr; + bool servSquash = false; if(M.good()) { auto it = opByIOID.find(ioid); if(it!=opByIOID.end()) { @@ -463,7 +512,15 @@ void Connection::handle_MONITOR() BitMask overrun; from_wire(M, overrun); - (void)overrun; // ignoring + for(auto i : range(overrun.wsize())) { + (void)i; + if(overrun.word(i)) { + // this update Value is the result of combining + // two or more Values on the server side. + servSquash = true; + break; + } + } } } @@ -589,6 +646,7 @@ void Connection::handle_MONITOR() mon->chan->name.c_str()); mon->queue.back().val.assign(update.val); + mon->nCliSquash++; } if(final && !update.exc) { @@ -604,11 +662,16 @@ void Connection::handle_MONITOR() peerName.c_str(), mon->chan->name.c_str()); notify = false; } + + if(mon->queueMax < mon->queue.size()) + mon->queueMax = mon->queue.size(); } if(notify) notify = mon->wantToNotify(); - } + if(servSquash) + mon->nSrvSquash++; + } // release mon->lock if(mon->state==SubscriptionImpl::Done || final) { mon->state=SubscriptionImpl::Done; diff --git a/src/pvxs/client.h b/src/pvxs/client.h index 01bef87..685a3f8 100644 --- a/src/pvxs/client.h +++ b/src/pvxs/client.h @@ -157,9 +157,23 @@ public: #endif }; +//! Information about the state of a Subscription +struct SubscriptionStat { + //! Number of events in the queue + size_t nQueue=0; + //! Number of Value updates where the server reported at least + //! one update dropped/squashed. + size_t nSrvSquash=0; + //! Number of Value updates dropped/squashed due to client queue overflow + size_t nCliSquash=0; + //! Max queue size so far + size_t maxQueue=0; + //! Limit on queue size + size_t limitQueue=0; +}; + //! Handle for monitor subscription struct PVXS_API Subscription { - virtual ~Subscription() =0; protected: @@ -172,16 +186,16 @@ public: //! Blocks until any in-progress callback has completed. virtual bool cancel() =0; - //! Ask a server to stop sending updates to this Subscription + //! Ask a server to stop (true) or re-start (false), sending updates to this Subscription virtual void pause(bool p=true) =0; //! Shorthand for @code pause(false) @endcode inline void resume() { pause(false); } /** De-queue update from subscription event queue. * - * If the queue is empty, return an empty/invalid Value (Value::valid()==false). - * A data update is returned as a Value. - * An error or special event is thrown. + * If the queue is empty, return an empty/invalid Value (Value::valid()==false). + * A data update is returned as a Value. + * An error or special event is thrown. * * @returns A valid Value until the queue is empty * @throws Connected (depending on MonitorBuilder::maskConnected()) @@ -194,18 +208,43 @@ public: * std::shared_ptr sub(...); * try { * while(auto update = sub.pop()) { + * // have data update * ... * } - * } catch(Connected& con) { - * } catch(Finished& con) { - * } catch(Disconnect& con) { - * } catch(RemoteError& con) { - * } catch(std::exception& con) { + * // queue empty + * } catch(Connected& con) { // if MonitorBuilder::maskConnected(false) + * } catch(Finished& con) { // if MonitorBuilder::maskDisconnected(false) + * } catch(Disconnect& con) { // if MonitorBuilder::maskDisconnected(false) + * } catch(RemoteError& con) { // error message from server + * } catch(std::exception& con) { // client side error * } * @endcode */ virtual Value pop() =0; +protected: + virtual bool doPop(std::vector& out, size_t limit=0u) =0; +public: + +#ifdef PVXS_EXPERT_API_ENABLED + /** De-queue a batch of updates from subscription event queue. + * + * @param out Updated with any Values dequeued. Will always be clear()d + * @param limit When non-zero, an upper limit on the number of Values which will be dequeued. + * @return true if the queue was not emptied, and pop() should be called again. + * false if the queue was emptied, and a further onEvent() callback may be awaited. + * @throws the same exceptions as non-batch pop() + * + * @since UNRELEASED Added + */ + inline bool pop(std::vector& out, size_t limit=0u) + { return doPop(out, limit); } +#endif + + //! Poll statistics + //! @since UNRELEASED + virtual void stats(SubscriptionStat&, bool reset = false) =0; + protected: virtual void _onEvent(std::function&&) =0; public: diff --git a/src/pvxs/source.h b/src/pvxs/source.h index 079dd54..5e74727 100644 --- a/src/pvxs/source.h +++ b/src/pvxs/source.h @@ -45,15 +45,15 @@ public: //! Information about a running monitor struct MonitorStat { //! Number of available elements in the output flow window. - size_t window; + size_t window=0; //! Number of un-sent updates in the local queue. Doesn't count updates //! serialized and in the TX buffer. - size_t nQueue, limitQueue; + size_t nQueue=0, limitQueue=0; - bool running; - bool finished; - bool pipeline; + bool running=false; + bool finished=false; + bool pipeline=false; }; //! Handle for active subscription