Factor out stream kind dependence into type parameter

This commit is contained in:
Dominik Werder
2021-05-21 10:47:42 +02:00
parent 61564f502e
commit a959250af9
3 changed files with 279 additions and 29 deletions

View File

@@ -1,19 +1,20 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT};
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator};
use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult};
use crate::agg::AggregatableXdim1Bin;
use crate::binned::scalar::binned_scalar_stream;
use crate::binned::scalar::{adapter_to_stream_item, binned_stream};
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream};
use crate::cache::BinnedQuery;
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{AggKind, BinnedRange, NodeConfigCached};
use netpod::{AggKind, BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use num_traits::Zero;
use serde::{Deserialize, Serialize, Serializer};
use std::pin::Pin;
@@ -233,12 +234,12 @@ pub async fn binned_bytes_for_http(
info!("binned_bytes_for_http found config entry {:?}", entry);
match query.agg_kind() {
AggKind::DimXBins1 => {
let res = binned_scalar_stream(node_config, query).await?;
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
AggKind::DimXBinsN(_) => {
let res = binned_scalar_stream(node_config, query).await?;
let res = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let ret = BinnedBytesForHttpStream::new(res.binned_stream);
Ok(Box::pin(ret))
}
@@ -387,8 +388,78 @@ pub async fn binned_json(node_config: &NodeConfigCached, query: &BinnedQuery) ->
// TODO create the matching stream based on AggKind and ConfigEntry.
let t = binned_scalar_stream(node_config, query).await?;
let t = binned_stream(node_config, query, BinnedStreamKindScalar::new()).await?;
let collected = collect_all(t.binned_stream, t.range.count as u32).await?;
let ret = collected.to_json_result();
let ret = collected.to_json_result()?;
Ok(serde_json::to_value(ret)?)
}
pub trait BinnedStreamKind {
type BinnedStreamItem: MakeBytesFrame;
type BinnedStreamType: Stream + Send + 'static;
fn new_binned_from_prebinned(
query: &BinnedQuery,
range: BinnedRange,
pre_range: PreBinnedPatchRange,
node_config: &NodeConfigCached,
) -> Result<Self::BinnedStreamType, Error>;
fn new_binned_from_merged(
evq: EventsQuery,
perf_opts: PerfOpts,
range: BinnedRange,
node_config: &NodeConfigCached,
) -> Result<Self::BinnedStreamType, Error>;
}
pub struct BinnedStreamKindScalar {}
pub struct BinnedStreamKindWave {}
impl BinnedStreamKindScalar {
pub fn new() -> Self {
Self {}
}
}
impl BinnedStreamKindWave {
pub fn new() -> Self {
Self {}
}
}
impl BinnedStreamKind for BinnedStreamKindScalar {
type BinnedStreamItem = Result<StreamItem<BinnedScalarStreamItem>, Error>;
type BinnedStreamType = BinnedStream<Self::BinnedStreamItem>;
fn new_binned_from_prebinned(
query: &BinnedQuery,
range: BinnedRange,
pre_range: PreBinnedPatchRange,
node_config: &NodeConfigCached,
) -> Result<Self::BinnedStreamType, Error> {
let s = BinnedScalarStreamFromPreBinnedPatches::new(
PreBinnedPatchIterator::from_range(pre_range),
query.channel().clone(),
range.clone(),
query.agg_kind().clone(),
query.cache_usage().clone(),
node_config,
query.disk_stats_every().clone(),
)?;
Ok(BinnedStream::new(Box::pin(s))?)
}
fn new_binned_from_merged(
evq: EventsQuery,
perf_opts: PerfOpts,
range: BinnedRange,
node_config: &NodeConfigCached,
) -> Result<Self::BinnedStreamType, Error> {
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone())
.into_binned_t(range.clone())
.map(adapter_to_stream_item);
Ok(BinnedStream::new(Box::pin(s))?)
}
}

View File

@@ -1,14 +1,14 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem;
use crate::agg::streams::StreamItem;
use crate::binned::{BinnedScalarStreamItem, BinnedStreamRes};
use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BinnedStream};
use crate::cache::{BinnedQuery, MergedFromRemotes};
use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind, BinnedStreamRes};
use crate::binnedstream::BinnedStream;
use crate::cache::BinnedQuery;
use crate::raw::EventsQuery;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchIterator, PreBinnedPatchRange};
use netpod::{BinnedRange, NodeConfigCached, PerfOpts, PreBinnedPatchRange};
pub fn adapter_to_stream_item(
k: Result<StreamItem<MinMaxAvgScalarBinBatchStreamItem>, Error>,
@@ -30,10 +30,14 @@ pub fn adapter_to_stream_item(
}
}
pub async fn binned_scalar_stream(
pub async fn binned_stream<BK>(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedStreamRes<Result<StreamItem<BinnedScalarStreamItem>, Error>>, Error> {
stream_kind: BK,
) -> Result<BinnedStreamRes<<BK::BinnedStreamType as Stream>::Item>, Error>
where
BK: BinnedStreamKind,
{
if query.channel().backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch node: {} requested: {}",
@@ -57,16 +61,8 @@ pub async fn binned_scalar_stream(
);
return Err(Error::with_msg(msg));
}
let s1 = BinnedScalarStreamFromPreBinnedPatches::new(
PreBinnedPatchIterator::from_range(pre_range),
query.channel().clone(),
range.clone(),
query.agg_kind().clone(),
query.cache_usage().clone(),
node_config,
query.disk_stats_every().clone(),
)?;
let s = BinnedStream::new(Box::pin(s1))?;
let s = BK::new_binned_from_prebinned(query, range.clone(), pre_range, node_config)?;
let s = BinnedStream::new(Box::pin(s))?;
let ret = BinnedStreamRes {
binned_stream: s,
range,
@@ -84,9 +80,7 @@ pub async fn binned_scalar_stream(
agg_kind: query.agg_kind().clone(),
};
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone());
let s = s.into_binned_t(range.clone());
let s = s.map(adapter_to_stream_item);
let s = BK::new_binned_from_merged(evq, perf_opts, range.clone(), node_config)?;
let s = BinnedStream::new(Box::pin(s))?;
let ret = BinnedStreamRes {
binned_stream: s,