From 38e17d440b9e06f4f1c1ee6dcc9fa45dbb36837d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 20:34:47 +0200 Subject: [PATCH] Remove binnedt3 and binnedx --- disk/src/agg.rs | 2 - disk/src/agg/binnedt3.rs | 252 --------------------------------------- disk/src/agg/binnedx.rs | 1 - disk/src/binned.rs | 19 ++- 4 files changed, 14 insertions(+), 260 deletions(-) delete mode 100644 disk/src/agg/binnedt3.rs delete mode 100644 disk/src/agg/binnedx.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 448a4f4..edb3abf 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -17,9 +17,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -pub mod binnedt3; pub mod binnedt4; -pub mod binnedx; pub mod enp; pub mod eventbatch; pub mod scalarbinbatch; diff --git a/disk/src/agg/binnedt3.rs b/disk/src/agg/binnedt3.rs deleted file mode 100644 index e134a03..0000000 --- a/disk/src/agg/binnedt3.rs +++ /dev/null @@ -1,252 +0,0 @@ -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, RangeOverlapInfo}; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use netpod::log::*; -use netpod::{BinnedRange, NanoRange}; -use std::collections::VecDeque; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub trait Aggregator3Tdim { - type InputValue; - type OutputValue; -} - -pub struct Agg3 { - range: NanoRange, - count: u64, - min: f32, - max: f32, - sum: f32, - sumc: u64, -} - -impl Agg3 { - fn new(range: NanoRange) -> Self { - Self { - range, - count: 0, - min: f32::MAX, - max: f32::MIN, - sum: f32::NAN, - sumc: 0, - } - } - - fn ingest(&mut self, item: &MinMaxAvgScalarEventBatch) { - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - if ts < self.range.beg { - continue; - } else if ts >= self.range.end { - continue; - } else { - self.count += 1; - self.min = self.min.min(item.mins[i1]); - self.max = self.max.max(item.maxs[i1]); - let x = item.avgs[i1]; - if x.is_nan() { - } else { - if self.sum.is_nan() { - self.sum = x; - } else { - self.sum += x; - } - self.sumc += 1; - } - } - } - } - - fn result(self) -> Vec { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.sumc == 0 { - f32::NAN - } else { - self.sum / self.sumc as f32 - }; - let v = MinMaxAvgScalarBinBatch { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], - }; - vec![v] - } -} - -impl Aggregator3Tdim for Agg3 { - type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinBatch; -} - -pub struct BinnedT3Stream { - // TODO get rid of box: - inp: Pin>, Error>> + Send>>, - //aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, - aggtor: Option, - spec: BinnedRange, - curbin: u32, - inp_completed: bool, - all_bins_emitted: bool, - range_complete_observed: bool, - range_complete_emitted: bool, - left: Option>, Error>>>>, - errored: bool, - completed: bool, - tmp_agg_results: VecDeque, -} - -impl BinnedT3Stream { - pub fn new(inp: S, spec: BinnedRange) -> Self - where - S: Stream>, Error>> + Send + 'static, - { - let range = spec.get_range(0); - Self { - inp: Box::pin(inp), - aggtor: Some(Agg3::new(range)), - spec, - curbin: 0, - inp_completed: false, - all_bins_emitted: false, - range_complete_observed: false, - range_complete_emitted: false, - left: None, - errored: false, - completed: false, - tmp_agg_results: VecDeque::new(), - } - } - - fn cur( - &mut self, - cx: &mut Context, - ) -> Poll>, Error>>> { - if let Some(cur) = self.left.take() { - cur - } else if self.inp_completed { - Poll::Ready(None) - } else { - let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) - } - } - - fn cycle_current_bin(&mut self) { - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let ret = self - .aggtor - .replace(Agg3::new(range)) - // TODO handle None case, or remove Option if Agg is always present - .unwrap() - .result(); - self.tmp_agg_results = VecDeque::from(ret); - if self.curbin >= self.spec.count as u32 { - self.all_bins_emitted = true; - } - } - - fn handle( - &mut self, - cur: Poll>, Error>>>, - ) -> Option>, Error>>>> { - use Poll::*; - match cur { - Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), - StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - None - } - RangeCompletableItem::Data(item) => { - if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - if item.ends_before(ag.range.clone()) { - None - } else if item.starts_after(ag.range.clone()) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } else { - ag.ingest(&item); - if item.ends_after(ag.range.clone()) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - } - }, - }, - Ready(Some(Err(e))) => { - self.errored = true; - Some(Ready(Some(Err(e)))) - } - Ready(None) => { - self.inp_completed = true; - if self.all_bins_emitted { - None - } else { - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - Pending => Some(Pending), - } - } -} - -impl Stream for BinnedT3Stream { - type Item = Result>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("IntoBinnedTDefaultStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } else if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else if self.inp_completed && self.all_bins_emitted { - self.range_complete_emitted = true; - if self.range_complete_observed { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue 'outer; - } - } else { - let cur = self.cur(cx); - match self.handle(cur) { - Some(item) => item, - None => continue 'outer, - } - }; - } - } -} diff --git a/disk/src/agg/binnedx.rs b/disk/src/agg/binnedx.rs deleted file mode 100644 index 8b13789..0000000 --- a/disk/src/agg/binnedx.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/disk/src/binned.rs b/disk/src/binned.rs index a642690..30792e6 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; @@ -1258,6 +1257,18 @@ pub enum RangeCompletableItem { Data(T), } +pub struct Agg3 {} + +pub struct BinnedT3Stream {} + +impl Stream for BinnedT3Stream { + type Item = Sitemty; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!() + } +} + impl StreamKind for BinnedStreamKindScalar { // TODO is this really needed? type TBinnedStreamType = BoxedStream>, Error>>; @@ -1283,15 +1294,13 @@ impl StreamKind for BinnedStreamKindScalar { range: BinnedRange, node_config: &NodeConfigCached, ) -> Result { - let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); - let s = Self::xbinned_to_tbinned(s, range); - Ok(BoxedStream::new(Box::pin(s))?) + err::todoval() } fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream where S: Stream>, Error>> + Send + 'static, { - Self::XBinnedToTBinnedStream::new(inp, spec) + err::todoval() } }