Output number of remaining bins for timed out response
This commit is contained in:
@@ -12,12 +12,18 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
|
||||
use num_traits::Zero;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<BinnedStream, Error> {
|
||||
pub struct BinnedStreamRes {
|
||||
pub binned_stream: BinnedStream,
|
||||
pub range: BinnedRange,
|
||||
}
|
||||
|
||||
pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<BinnedStreamRes, Error> {
|
||||
if query.channel().backend != node_config.node.backend {
|
||||
let err = Error::with_msg(format!(
|
||||
"backend mismatch node: {} requested: {}",
|
||||
@@ -48,13 +54,17 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery)
|
||||
let s1 = BinnedStreamFromPreBinnedPatches::new(
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
query.channel().clone(),
|
||||
range,
|
||||
range.clone(),
|
||||
query.agg_kind().clone(),
|
||||
query.cache_usage().clone(),
|
||||
node_config,
|
||||
query.disk_stats_every().clone(),
|
||||
)?;
|
||||
let ret = BinnedStream::new(Box::pin(s1))?;
|
||||
let s = BinnedStream::new(Box::pin(s1))?;
|
||||
let ret = BinnedStreamRes {
|
||||
binned_stream: s,
|
||||
range,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Ok(None) => {
|
||||
@@ -68,9 +78,13 @@ pub async fn binned_stream(node_config: &NodeConfigCached, query: &BinnedQuery)
|
||||
agg_kind: query.agg_kind().clone(),
|
||||
};
|
||||
// TODO do I need to set up more transformations or binning to deliver the requested data?
|
||||
let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
|
||||
let s1 = s1.into_binned_t(range);
|
||||
let ret = BinnedStream::new(Box::pin(s1))?;
|
||||
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
|
||||
let s = s.into_binned_t(range.clone());
|
||||
let s = BinnedStream::new(Box::pin(s))?;
|
||||
let ret = BinnedStreamRes {
|
||||
binned_stream: s,
|
||||
range,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
@@ -83,8 +97,8 @@ pub async fn binned_bytes_for_http(
|
||||
node_config: &NodeConfigCached,
|
||||
query: &BinnedQuery,
|
||||
) -> Result<BinnedStreamBox, Error> {
|
||||
let s1 = binned_stream(node_config, query).await?;
|
||||
let ret = BinnedBytesForHttpStream::new(s1);
|
||||
let res = binned_stream(node_config, query).await?;
|
||||
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
|
||||
Ok(Box::pin(ret))
|
||||
}
|
||||
|
||||
@@ -163,9 +177,9 @@ pub struct BinnedJsonResult {
|
||||
ts_bin_edges: Vec<IsoDateTime>,
|
||||
counts: Vec<u64>,
|
||||
#[serde(skip_serializing_if = "Bool::is_false")]
|
||||
partial_content: bool,
|
||||
#[serde(skip_serializing_if = "Bool::is_false")]
|
||||
finalised_range: bool,
|
||||
#[serde(skip_serializing_if = "Zero::is_zero")]
|
||||
missing_bins: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
continue_at: Option<IsoDateTime>,
|
||||
}
|
||||
@@ -173,7 +187,10 @@ pub struct BinnedJsonResult {
|
||||
pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) -> Result<serde_json::Value, Error> {
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_millis(1000);
|
||||
let mut batch = MinMaxAvgScalarBinBatch::empty();
|
||||
let mut items = binned_stream(node_config, query).await?;
|
||||
let t = binned_stream(node_config, query).await?;
|
||||
let bin_count_exp = t.range.count;
|
||||
let mut bin_count = 0;
|
||||
let mut items = t.binned_stream;
|
||||
let mut i1 = 0;
|
||||
let mut partial_content = false;
|
||||
let mut finalised_range = false;
|
||||
@@ -196,6 +213,7 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
||||
Ok(item) => match item {
|
||||
Values(mut vals) => {
|
||||
// TODO gather stats about the batch sizes.
|
||||
bin_count += vals.ts1s.len() as u64;
|
||||
batch.ts1s.append(&mut vals.ts1s);
|
||||
batch.ts2s.append(&mut vals.ts2s);
|
||||
batch.counts.append(&mut vals.counts);
|
||||
@@ -235,10 +253,13 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
|
||||
} else {
|
||||
None
|
||||
};
|
||||
if bin_count_exp < bin_count {
|
||||
Err(Error::with_msg("bin_count_exp < bin_count"))?
|
||||
}
|
||||
let ret = BinnedJsonResult {
|
||||
ts_bin_edges: tsa,
|
||||
counts: batch.counts,
|
||||
partial_content,
|
||||
missing_bins: bin_count_exp - bin_count,
|
||||
finalised_range,
|
||||
continue_at,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user