WIP on factor stream kind

This commit is contained in:
Dominik Werder
2021-05-21 11:40:35 +02:00
parent a959250af9
commit b056811800
9 changed files with 64 additions and 49 deletions
+24 -10
View File
@@ -1,7 +1,7 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::streams::StreamItem;
use crate::binned::BinnedScalarStreamItem;
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::binned::{BinnedScalarStreamItem, BinnedStreamKind};
use crate::cache::pbvfs::{PreBinnedScalarItem, PreBinnedScalarValueFetchedStream};
use crate::cache::{CacheUsage, PreBinnedQuery};
use err::Error;
use futures_core::Stream;
@@ -12,11 +12,18 @@ use std::future::ready;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct BinnedScalarStreamFromPreBinnedPatches {
pub struct BinnedScalarStreamFromPreBinnedPatches<BK>
where
BK: BinnedStreamKind,
{
inp: Pin<Box<dyn Stream<Item = Result<StreamItem<BinnedScalarStreamItem>, Error>> + Send>>,
_marker: BK::Dummy,
}
impl BinnedScalarStreamFromPreBinnedPatches {
impl<BK> BinnedScalarStreamFromPreBinnedPatches<BK>
where
BK: BinnedStreamKind,
{
pub fn new(
patch_it: PreBinnedPatchIterator,
channel: Channel,
@@ -25,6 +32,7 @@ impl BinnedScalarStreamFromPreBinnedPatches {
cache_usage: CacheUsage,
node_config: &NodeConfigCached,
disk_stats_every: ByteSize,
stream_kind: &BK,
) -> Result<Self, Error> {
let patches: Vec<_> = patch_it.collect();
let mut sp = String::new();
@@ -34,7 +42,7 @@ impl BinnedScalarStreamFromPreBinnedPatches {
use std::fmt::Write;
write!(sp, " • patch {:2} {:?}\n", i, p)?;
}
info!("BinnedStream::new\n{}", sp);
info!("Using these pre-binned patches:\n{}", sp);
}
let inp = futures_util::stream::iter(patches.into_iter())
.map({
@@ -48,7 +56,7 @@ impl BinnedScalarStreamFromPreBinnedPatches {
disk_stats_every.clone(),
);
let s: Pin<Box<dyn Stream<Item = _> + Send>> =
match PreBinnedValueFetchedStream::new(&query, &node_config) {
match PreBinnedScalarValueFetchedStream::new(&query, &node_config) {
Ok(k) => Box::pin(k),
Err(e) => {
error!("error from PreBinnedValueFetchedStream::new {:?}", e);
@@ -68,10 +76,10 @@ impl BinnedScalarStreamFromPreBinnedPatches {
StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))),
StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))),
StreamItem::DataItem(item) => match item {
PreBinnedItem::RangeComplete => {
PreBinnedScalarItem::RangeComplete => {
Some(Ok(StreamItem::DataItem(BinnedScalarStreamItem::RangeComplete)))
}
PreBinnedItem::Batch(item) => {
PreBinnedScalarItem::Batch(item) => {
use super::agg::{Fits, FitsInside};
match item.fits_inside(fit_range) {
Fits::Inside
@@ -91,11 +99,17 @@ impl BinnedScalarStreamFromPreBinnedPatches {
}
})
.into_binned_t(range);
Ok(Self { inp: Box::pin(inp) })
Ok(Self {
inp: Box::pin(inp),
_marker: BK::Dummy::default(),
})
}
}
impl Stream for BinnedScalarStreamFromPreBinnedPatches {
impl<BK> Stream for BinnedScalarStreamFromPreBinnedPatches<BK>
where
BK: BinnedStreamKind,
{
type Item = Result<StreamItem<BinnedScalarStreamItem>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {