From 131283fab96adb06e85ce037f5587e2917d1678f Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 20:16:32 +0200 Subject: [PATCH] Remove AggregatableTdim and AggregatorTdim --- disk/src/agg.rs | 3 +- disk/src/agg/binnedt.rs | 239 --------------------------------- disk/src/agg/eventbatch.rs | 92 ------------- disk/src/agg/scalarbinbatch.rs | 87 ------------ disk/src/aggtest.rs | 17 +-- disk/src/binned.rs | 12 +- 6 files changed, 7 insertions(+), 443 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 556e6d4..990927c 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -3,7 +3,6 @@ Aggregation and binning support. */ use super::eventchunker::EventFull; -use crate::agg::binnedt::AggregatableTdim; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::streams::StreamItem; use crate::binned::{RangeCompletableItem, StreamKind}; @@ -32,7 +31,7 @@ pub trait AggregatableXdim1Bin where SK: StreamKind, { - type Output: AggregatableXdim1Bin + AggregatableTdim; + type Output: AggregatableXdim1Bin; fn into_agg(self) -> Self::Output; } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index c13cd11..8b13789 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -1,240 +1 @@ -use crate::agg::streams::StreamItem; -use crate::binned::{RangeCompletableItem, StreamKind}; -use err::Error; -use futures_core::Stream; -use futures_util::StreamExt; -use netpod::log::*; -use netpod::BinnedRange; -use std::collections::VecDeque; -use std::pin::Pin; -use std::task::{Context, Poll}; -pub trait AggregatorTdim: Sized + Unpin -where - SK: StreamKind, -{ - type InputValue; - type OutputValue; - fn ends_before(&self, inp: &Self::InputValue) -> bool; - fn ends_after(&self, inp: &Self::InputValue) -> bool; - fn starts_after(&self, inp: &Self::InputValue) -> bool; - fn ingest(&mut self, inp: &mut Self::InputValue); - fn result(self) -> Vec; -} - -pub trait AggregatableTdim: Sized -where - SK: StreamKind, -{ - type Aggregator: AggregatorTdim; - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; -} - -pub trait IntoBinnedT -where - SK: StreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, -{ - fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream; -} - -impl IntoBinnedT for S -where - SK: StreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, -{ - fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream { - IntoBinnedTDefaultStream::new(self, spec) - } -} - -pub struct IntoBinnedTDefaultStream -where - SK: StreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, -{ - inp: S, - aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, - spec: BinnedRange, - curbin: u32, - inp_completed: bool, - all_bins_emitted: bool, - range_complete_observed: bool, - range_complete_emitted: bool, - left: Option::XBinnedEvents>>, Error>>>>, - errored: bool, - completed: bool, - tmp_agg_results: VecDeque<::TBinnedBins>, - _marker: std::marker::PhantomData, -} - -impl IntoBinnedTDefaultStream -where - SK: StreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, -{ - pub fn new(inp: S, spec: BinnedRange) -> Self { - let range = spec.get_range(0); - Self { - inp, - aggtor: Some( - <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( - range.beg, range.end, - ), - ), - spec, - curbin: 0, - inp_completed: false, - all_bins_emitted: false, - range_complete_observed: false, - range_complete_emitted: false, - left: None, - errored: false, - completed: false, - tmp_agg_results: VecDeque::new(), - _marker: std::marker::PhantomData::default(), - } - } - - fn cur( - &mut self, - cx: &mut Context, - ) -> Poll::XBinnedEvents>>, Error>>> { - if let Some(cur) = self.left.take() { - cur - } else if self.inp_completed { - Poll::Ready(None) - } else { - let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) - } - } - - fn cycle_current_bin(&mut self) { - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let _ret = self - .aggtor - .replace( - <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( - range.beg, range.end, - ), - ) - // TODO handle None case, or remove Option if Agg is always present - .unwrap() - .result(); - // TODO retire this module - err::todo(); - self.tmp_agg_results = VecDeque::new(); - //self.tmp_agg_results = VecDeque::from(ret); - if self.curbin >= self.spec.count as u32 { - self.all_bins_emitted = true; - } - } - - fn handle( - &mut self, - cur: Poll::XBinnedEvents>>, Error>>>, - ) -> Option::TBinnedBins>>, Error>>>> { - use Poll::*; - match cur { - Ready(Some(Ok(item))) => match item { - StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), - StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), - StreamItem::DataItem(item) => match item { - RangeCompletableItem::RangeComplete => { - self.range_complete_observed = true; - None - } - RangeCompletableItem::Data(item) => { - if self.all_bins_emitted { - // Just drop the item because we will not emit anymore data. - // Could also at least gather some stats. - None - } else { - let ag = self.aggtor.as_mut().unwrap(); - //if ag.ends_before(&item) { - if ag.ends_before(err::todoval()) { - None - //} else if ag.starts_after(&item) { - } else if ag.starts_after(err::todoval()) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } else { - let item = item; - // TODO can we retire this module? - //ag.ingest(&mut item); - ag.ingest(err::todoval()); - let item = item; - //if ag.ends_after(&item) { - if ag.ends_after(err::todoval()) { - self.left = - Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); - self.cycle_current_bin(); - } - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - } - }, - }, - Ready(Some(Err(e))) => { - self.errored = true; - Some(Ready(Some(Err(e)))) - } - Ready(None) => { - self.inp_completed = true; - if self.all_bins_emitted { - None - } else { - self.cycle_current_bin(); - // TODO cycle_current_bin enqueues the bin, can I return here instead? - None - } - } - Pending => Some(Pending), - } - } -} - -impl Stream for IntoBinnedTDefaultStream -where - SK: StreamKind, - S: Stream::XBinnedEvents>>, Error>> + Unpin, -{ - type Item = Result::TBinnedBins>>, Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if self.completed { - panic!("IntoBinnedTDefaultStream poll_next on completed"); - } else if self.errored { - self.completed = true; - Ready(None) - } else if let Some(item) = self.tmp_agg_results.pop_front() { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) - } else if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else if self.inp_completed && self.all_bins_emitted { - self.range_complete_emitted = true; - if self.range_complete_observed { - Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) - } else { - continue 'outer; - } - } else { - let cur = self.cur(cx); - match self.handle(cur) { - Some(item) => item, - None => continue 'outer, - } - }; - } - } -} diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 0e50abf..86efa90 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::agg::AggregatableXdim1Bin; @@ -111,18 +110,6 @@ where } } -impl AggregatableTdim for MinMaxAvgScalarEventBatch -where - SK: StreamKind, -{ - //type Output = MinMaxAvgScalarBinBatch; - type Aggregator = MinMaxAvgScalarEventBatchAggregator; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) - } -} - impl MinMaxAvgScalarEventBatch { #[allow(dead_code)] fn old_serialized(&self) -> Bytes { @@ -172,85 +159,6 @@ impl MinMaxAvgScalarEventBatchAggregator { } } -impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator -where - SK: StreamKind, -{ - type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinBatch; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp.tss.last() { - Some(&ts) => ts < self.ts1, - None => true, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp.tss.last() { - Some(&ts) => ts >= self.ts2, - None => panic!(), - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp.tss.first() { - Some(&ts) => ts >= self.ts2, - None => panic!(), - } - } - - fn ingest(&mut self, v: &mut Self::InputValue) { - for i1 in 0..v.tss.len() { - let ts = v.tss[i1]; - if ts < self.ts1 { - continue; - } else if ts >= self.ts2 { - continue; - } else { - self.count += 1; - self.min = self.min.min(v.mins[i1]); - self.max = self.max.max(v.maxs[i1]); - let x = v.avgs[i1]; - if x.is_nan() { - } else { - if self.sum.is_nan() { - self.sum = x; - } else { - self.sum += x; - } - self.sumc += 1; - } - } - } - } - - fn result(self) -> Vec { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.sumc == 0 { - f32::NAN - } else { - self.sum / self.sumc as f32 - }; - - // TODO impl problem: - // The return type of this function must be the concrete type that I implement for. - // Otherwise I have no chance building that values. - // I must somehow differently couple that to the SK. - - let v = MinMaxAvgScalarBinBatch { - ts1s: vec![self.ts1], - ts2s: vec![self.ts2], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], - }; - vec![v] - } -} - impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 90fb472..f2d4be1 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::streams::{Appendable, StreamItem, ToJsonBytes}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind}; @@ -195,18 +194,6 @@ where } } -impl AggregatableTdim for MinMaxAvgScalarBinBatch -where - SK: StreamKind, -{ - //type Output = MinMaxAvgScalarBinBatch; - type Aggregator = MinMaxAvgScalarBinBatchAggregator; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2) - } -} - pub struct MinMaxAvgScalarBinBatchAggregator { ts1: u64, ts2: u64, @@ -231,80 +218,6 @@ impl MinMaxAvgScalarBinBatchAggregator { } } -impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator -where - SK: StreamKind, -{ - type InputValue = MinMaxAvgScalarBinBatch; - type OutputValue = MinMaxAvgScalarBinBatch; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp.ts2s.last() { - Some(&ts) => ts <= self.ts1, - None => true, - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp.ts2s.last() { - Some(&ts) => ts >= self.ts2, - _ => panic!(), - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp.ts1s.first() { - Some(&ts) => ts >= self.ts2, - _ => panic!(), - } - } - - fn ingest(&mut self, v: &mut Self::InputValue) { - for i1 in 0..v.ts1s.len() { - let ts1 = v.ts1s[i1]; - let ts2 = v.ts2s[i1]; - if ts2 <= self.ts1 { - continue; - } else if ts1 >= self.ts2 { - continue; - } else { - self.count += v.counts[i1]; - self.min = self.min.min(v.mins[i1]); - self.max = self.max.max(v.maxs[i1]); - let x = v.avgs[i1]; - if x.is_nan() { - } else { - if self.sum.is_nan() { - self.sum = x; - } else { - self.sum += x; - } - self.sumc += 1; - } - } - } - } - - fn result(self) -> Vec { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.sumc == 0 { - f32::NAN - } else { - self.sum / self.sumc as f32 - }; - let v = MinMaxAvgScalarBinBatch { - ts1s: vec![self.ts1], - ts2s: vec![self.ts2], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], - }; - vec![v] - } -} - impl MakeBytesFrame for Result>, Error> { fn make_bytes_frame(&self) -> Result { Ok(make_frame(self)?.freeze()) diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index bb58adb..9c8c1e5 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,5 +1,4 @@ use super::agg::IntoDim1F32Stream; -use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedx::IntoBinnedXBins1; use crate::binned::BinnedStreamKindScalar; use crate::eventblobs::EventBlobsComplete; @@ -68,13 +67,9 @@ async fn agg_x_dim_0_inner() { ); let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); - let fut1 = IntoBinnedT::::into_binned_t( - fut1, - BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), - ); - let fut1 = fut1 - //.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) - .for_each(|_k| ready(())); + + // TODO add the t binning and expectation. + let fut1 = fut1.for_each(|_k| ready(())); fut1.await; } @@ -138,10 +133,8 @@ async fn agg_x_dim_1_inner() { //info!("after X binning {:?}", k.as_ref().unwrap()); k }); - let fut1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t( - fut1, - BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), - ); + + // TODO add T-binning and expectation. let fut1 = fut1 .map(|k| { info!("after T binning {:?}", k.as_ref().unwrap()); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 3e08be4..2fe0bf6 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,4 +1,3 @@ -use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::binnedt4::{ @@ -707,16 +706,7 @@ pub trait RangeOverlapInfo { } pub trait XBinnedEvents: - Sized - + Unpin - + Send - + Serialize - + DeserializeOwned - + AggregatableTdim - + WithLen - + WithTimestamps - + PushableIndex - + Appendable + Sized + Unpin + Send + Serialize + DeserializeOwned + WithLen + WithTimestamps + PushableIndex + Appendable where SK: StreamKind, {