From 530f88cd442776eb9feff6d0305d5c2e8e02449d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 18 May 2021 08:37:02 +0200 Subject: [PATCH] Output number of remaining bins for timed out response --- disk/src/binned.rs | 45 +++++++++++++++++++++++++++++++++------------ netpod/src/lib.rs | 10 +++++----- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 6676d54..b830784 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -12,12 +12,18 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange}; +use num_traits::Zero; use serde::{Deserialize, Serialize, Serializer}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { +pub struct BinnedStreamRes { + pub binned_stream: BinnedStream, + pub range: BinnedRange, +} + +pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { if query.channel().backend != node_config.node.backend { let err = Error::with_msg(format!( "backend mismatch node: {} requested: {}", @@ -48,13 +54,17 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) let s1 = BinnedStreamFromPreBinnedPatches::new( PreBinnedPatchIterator::from_range(pre_range), query.channel().clone(), - range, + range.clone(), query.agg_kind().clone(), query.cache_usage().clone(), node_config, query.disk_stats_every().clone(), )?; - let ret = BinnedStream::new(Box::pin(s1))?; + let s = BinnedStream::new(Box::pin(s1))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; Ok(ret) } Ok(None) => { @@ -68,9 +78,13 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) agg_kind: query.agg_kind().clone(), }; // TODO do I need to set up more transformations or binning to deliver the requested data? - let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); - let s1 = s1.into_binned_t(range); - let ret = BinnedStream::new(Box::pin(s1))?; + let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); + let s = s.into_binned_t(range.clone()); + let s = BinnedStream::new(Box::pin(s))?; + let ret = BinnedStreamRes { + binned_stream: s, + range, + }; Ok(ret) } Err(e) => Err(e), @@ -83,8 +97,8 @@ pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, ) -> Result { - let s1 = binned_stream(node_config, query).await?; - let ret = BinnedBytesForHttpStream::new(s1); + let res = binned_stream(node_config, query).await?; + let ret = BinnedBytesForHttpStream::new(res.binned_stream); Ok(Box::pin(ret)) } @@ -163,9 +177,9 @@ pub struct BinnedJsonResult { ts_bin_edges: Vec, counts: Vec, #[serde(skip_serializing_if = "Bool::is_false")] - partial_content: bool, - #[serde(skip_serializing_if = "Bool::is_false")] finalised_range: bool, + #[serde(skip_serializing_if = "Zero::is_zero")] + missing_bins: u64, #[serde(skip_serializing_if = "Option::is_none")] continue_at: Option, } @@ -173,7 +187,10 @@ pub struct BinnedJsonResult { pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result { let deadline = tokio::time::Instant::now() + Duration::from_millis(1000); let mut batch = MinMaxAvgScalarBinBatch::empty(); - let mut items = binned_stream(node_config, query).await?; + let t = binned_stream(node_config, query).await?; + let bin_count_exp = t.range.count; + let mut bin_count = 0; + let mut items = t.binned_stream; let mut i1 = 0; let mut partial_content = false; let mut finalised_range = false; @@ -196,6 +213,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Ok(item) => match item { Values(mut vals) => { // TODO gather stats about the batch sizes. + bin_count += vals.ts1s.len() as u64; batch.ts1s.append(&mut vals.ts1s); batch.ts2s.append(&mut vals.ts2s); batch.counts.append(&mut vals.counts); @@ -235,10 +253,13 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> } else { None }; + if bin_count_exp < bin_count { + Err(Error::with_msg("bin_count_exp < bin_count"))? + } let ret = BinnedJsonResult { ts_bin_edges: tsa, counts: batch.counts, - partial_content, + missing_bins: bin_count_exp - bin_count, finalised_range, continue_at, }; diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 146c914..ad8b125 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -561,11 +561,11 @@ impl BinnedRange { let t = BIN_THRESHOLDS[i1]; if t <= bs || i1 == 0 { let grid_spec = BinnedGridSpec { bin_t_len: t }; - let pl = grid_spec.bin_t_len(); - let ts1 = range.beg / pl * pl; - let ts2 = (range.end + pl - 1) / pl * pl; - let count = (ts2 - ts1) / pl; - let offset = ts1 / pl; + let bl = grid_spec.bin_t_len(); + let ts1 = range.beg / bl * bl; + let ts2 = (range.end + bl - 1) / bl * bl; + let count = (ts2 - ts1) / bl; + let offset = ts1 / bl; let ret = Self { grid_spec, count,