use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; use netpod::{AggKind, BinnedRange, Channel, NodeConfigCached, PreBinnedPatchIterator}; use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; pub struct BinnedStream { inp: Pin> + Send>>, } impl BinnedStream { pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, range: BinnedRange, agg_kind: AggKind, cache_usage: CacheUsage, node_config: &NodeConfigCached, ) -> Result { let patches: Vec<_> = patch_it.collect(); let mut sp = String::new(); for (i, p) in patches.iter().enumerate() { use std::fmt::Write; write!(sp, " • patch {:2} {:?}\n", i, p)?; } info!("BinnedStream::new\n{}", sp); use super::agg::binnedt::IntoBinnedT; let inp = futures_util::stream::iter(patches.into_iter()) .map({ let node_config = node_config.clone(); move |patch| { let query = PreBinnedQuery::new(patch, channel.clone(), agg_kind.clone(), cache_usage.clone()); PreBinnedValueFetchedStream::new(&query, &node_config) } }) .filter_map(|k| match k { Ok(k) => ready(Some(k)), Err(e) => { error!("{:?}", e); ready(None) } }) .flatten() .filter_map({ let range = range.clone(); move |k| { let fit_range = range.full_range(); let g = match k { Ok(PreBinnedItem::Batch(k)) => { use super::agg::{Fits, FitsInside}; match k.fits_inside(fit_range) { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k))), _ => None, } } Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), Ok(PreBinnedItem::EventDataReadStats(stats)) => { info!("BinnedStream observes stats {:?}", stats); Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), Err(e) => Some(Err(e)), }; ready(g) } }) .into_binned_t(range); Ok(Self { inp: Box::pin(inp) }) } } impl Stream for BinnedStream { // TODO make this generic over all possible things type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.inp.poll_next_unpin(cx) } } pub struct BinnedStreamFromMerged { inp: Pin> + Send>>, } impl BinnedStreamFromMerged { pub fn new( inp: Pin> + Send>>, ) -> Result { Ok(Self { inp }) } } impl Stream for BinnedStreamFromMerged { // TODO make this generic over all possible things type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.inp.poll_next_unpin(cx) } }