From f69523bace7800846bbe3e16671776a4e172126a Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 20:28:51 +0200 Subject: [PATCH] Remove IntoBinnedT --- disk/src/agg/binnedt2.rs | 188 --------------------------------------- disk/src/binned.rs | 15 +--- disk/src/binnedstream.rs | 114 ------------------------ 3 files changed, 2 insertions(+), 315 deletions(-) diff --git a/disk/src/agg/binnedt2.rs b/disk/src/agg/binnedt2.rs index 8577ae1..e00ac0a 100644 --- a/disk/src/agg/binnedt2.rs +++ b/disk/src/agg/binnedt2.rs @@ -24,194 +24,6 @@ pub trait AggregatableTdim2: Sized { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } -pub trait IntoBinnedT { - type StreamOut: Stream; - fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; -} - -impl IntoBinnedT for S -where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim2 + Unpin, - I::Aggregator: Unpin, -{ - type StreamOut = IntoBinnedTDefaultStream; - - fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { - IntoBinnedTDefaultStream::new(self, spec) - } -} - -pub struct IntoBinnedTDefaultStream -where - S: Stream>, Error>>, - I: AggregatableTdim2, -{ - inp: S, - aggtor: Option, - spec: BinnedRange, - curbin: u32, - inp_completed: bool, - all_bins_emitted: bool, - range_complete_observed: bool, - range_complete_emitted: bool, - left: Option>, Error>>>>, - errored: bool, - completed: bool, - tmp_agg_results: VecDeque, -} - -impl IntoBinnedTDefaultStream -where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim2, -{ - pub fn new(inp: S, spec: BinnedRange) -> Self { - let range = spec.get_range(0); - Self { - inp, - aggtor: Some(I::aggregator_new_static(range.beg, range.end)), - spec, - curbin: 0, - inp_completed: false, - all_bins_emitted: false, - range_complete_observed: false, - range_complete_emitted: false, - left: None, - errored: false, - completed: false, - tmp_agg_results: VecDeque::new(), - } - } - - fn cur(&mut self, cx: &mut Context) -> Poll>, Error>>> { - if let Some(cur) = self.left.take() { - cur - } else if self.inp_completed { - Poll::Ready(None) - } else { - let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) - } - } - - fn cycle_current_bin(&mut self) { - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let ret = self - .aggtor - .replace(I::aggregator_new_static(range.beg, range.end)) - // TODO handle None case, or remove Option if Agg is always present - .unwrap() - .result(); - self.tmp_agg_results = ret.into(); - if self.curbin >= self.spec.count as u32 { - self.all_bins_emitted = true; - } - } - - fn handle( - &mut self, - cur: Poll>, Error>>>, - ) -> Option>, Error>>>> { - use Poll::*; - match cur { - Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), - StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => Some(Ready(Some(Ok(StreamItem::DataItem( - RangeCompletableItem::RangeComplete, - ))))), - RangeCompletableItem::Data(item) => { - if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&item) { - None - } else if ag.starts_after(&item) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } else { - let mut item = item; - ag.ingest(&mut item); - let item = item; - if ag.ends_after(&item) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - } - }, - }, - Ready(Some(Err(e))) => { - self.errored = true; - Some(Ready(Some(Err(e)))) - } - Ready(None) => { - self.inp_completed = true; - if self.all_bins_emitted { - None - } else { - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - Pending => Some(Pending), - } - } -} - -impl Stream for IntoBinnedTDefaultStream -where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim2 + Unpin, - I::Aggregator: Unpin, -{ - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("IntoBinnedTDefaultStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } else if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else if self.inp_completed && self.all_bins_emitted { - self.range_complete_emitted = true; - if self.range_complete_observed { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue 'outer; - } - } else { - let cur = self.cur(cx); - match self.handle(cur) { - Some(item) => item, - None => continue 'outer, - } - }; - } - } -} - pub struct MinMaxAvgScalarBinBatchAgg { ts1: u64, ts2: u64, diff --git a/disk/src/binned.rs b/disk/src/binned.rs index e5b8dc6..258e467 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -8,7 +8,7 @@ use crate::agg::streams::{Appendable, Collectable, Collector, StreamItem, ToJson use crate::agg::{Fits, FitsInside}; use crate::binned::binnedfrompbv::BinnedFromPreBinned; use crate::binned::query::{BinnedQuery, PreBinnedQuery}; -use crate::binnedstream::{BinnedScalarStreamFromPreBinnedPatches, BoxedStream}; +use crate::binnedstream::BoxedStream; use crate::cache::MergedFromRemotes; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValues, EventValuesDim0Case, EventValuesDim1Case, @@ -1283,18 +1283,7 @@ impl StreamKind for BinnedStreamKindScalar { pre_range: PreBinnedPatchRange, node_config: &NodeConfigCached, ) -> Result { - let s = BinnedScalarStreamFromPreBinnedPatches::new( - PreBinnedPatchIterator::from_range(pre_range), - query.channel().clone(), - range.clone(), - query.agg_kind().clone(), - query.cache_usage().clone(), - node_config, - query.disk_stats_every().clone(), - query.report_error(), - self.clone(), - )?; - Ok(BoxedStream::new(Box::pin(s))?) + err::todoval() } fn new_binned_from_merged( diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 658545b..7ee5cb9 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -12,120 +12,6 @@ use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; -// TODO remove after refactor. -pub struct BinnedScalarStreamFromPreBinnedPatches -where - SK: StreamKind, -{ - inp: Pin< - Box::TBinnedBins>>, Error>> + Send>, - >, - _stream_kind: SK, -} - -impl BinnedScalarStreamFromPreBinnedPatches -where - SK: StreamKind, - Result::TBinnedBins>>, Error>: FrameType, -{ - pub fn new( - patch_it: PreBinnedPatchIterator, - channel: Channel, - range: BinnedRange, - agg_kind: AggKind, - cache_usage: CacheUsage, - node_config: &NodeConfigCached, - disk_stats_every: ByteSize, - report_error: bool, - stream_kind: SK, - ) -> Result { - let patches: Vec<_> = patch_it.collect(); - let mut sp = String::new(); - if false { - // Convert this to a StreamLog message: - for (i, p) in patches.iter().enumerate() { - use std::fmt::Write; - write!(sp, " • patch {:2} {:?}\n", i, p)?; - } - info!("Using these pre-binned patches:\n{}", sp); - } - let pmax = patches.len(); - let inp = futures_util::stream::iter(patches.into_iter().enumerate()) - .map({ - let node_config = node_config.clone(); - let stream_kind = stream_kind.clone(); - move |(pix, patch)| { - let query = PreBinnedQuery::new( - patch, - channel.clone(), - agg_kind.clone(), - cache_usage.clone(), - disk_stats_every.clone(), - report_error, - ); - let ret: Pin + Send>> = - match PreBinnedScalarValueFetchedStream::new(&query, &node_config, &stream_kind) { - Ok(stream) => Box::pin(stream.map(move |q| (pix, q))), - Err(e) => { - error!("error from PreBinnedValueFetchedStream::new {:?}", e); - Box::pin(futures_util::stream::iter(vec![(pix, Err(e))])) - } - }; - ret - } - }) - .flatten() - .filter_map({ - let range = range.clone(); - move |(pix, k)| { - let fit_range = range.full_range(); - let g = match k { - Ok(item) => match item { - StreamItem::Log(item) => Some(Ok(StreamItem::Log(item))), - StreamItem::Stats(item) => Some(Ok(StreamItem::Stats(item))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - if pix + 1 == pmax { - Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))) - } else { - None - } - } - RangeCompletableItem::Data(item) => { - match crate::binned::FilterFittingInside::filter_fitting_inside(item, fit_range) { - Some(item) => Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), - None => None, - } - } - }, - }, - Err(e) => Some(Err(e)), - }; - ready(g) - } - }); - // TODO activate the T-binning via the bin-to-bin binning trait. - //err::todo(); - let inp = crate::agg::binnedt2::IntoBinnedT::into_binned_t(inp, range); - Ok(Self { - inp: Box::pin(inp), - _stream_kind: stream_kind, - }) - } -} - -// TODO remove after SK no longer needed. -impl Stream for BinnedScalarStreamFromPreBinnedPatches -where - SK: StreamKind, -{ - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.inp.poll_next_unpin(cx) - } -} - // TODO remove after refactor. pub struct BoxedStream { inp: Pin + Send>>,