From 0ab17d35da7f61815eb8ddcd205a3b38205ae9c6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 May 2021 14:43:06 +0200 Subject: [PATCH] Transition to more specific stage 1 binning, compiles --- disk/src/agg.rs | 1 + disk/src/agg/binnedt.rs | 88 ++++++++++++++++---------------- disk/src/agg/binnedt3.rs | 101 +++++++++++++++++++++++++++++++++++++ disk/src/agg/eventbatch.rs | 6 +++ disk/src/agg/streams.rs | 100 ------------------------------------ disk/src/aggtest.rs | 4 +- disk/src/binned.rs | 24 ++++++++- disk/src/cache/pbv.rs | 2 +- disk/src/raw/bffr.rs | 2 +- 9 files changed, 179 insertions(+), 149 deletions(-) create mode 100644 disk/src/agg/binnedt3.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 13c748a..4e7269c 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -20,6 +20,7 @@ use std::time::{Duration, Instant}; pub mod binnedt; pub mod binnedt2; +pub mod binnedt3; pub mod binnedx; pub mod eventbatch; pub mod scalarbinbatch; diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index ba95e52..8ac803a 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -15,7 +15,7 @@ where SK: BinnedStreamKind, { type InputValue; - type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; + type OutputValue; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool; @@ -27,67 +27,63 @@ pub trait AggregatableTdim: Sized where SK: BinnedStreamKind, { - //type Output: AggregatableXdim1Bin + AggregatableTdim; - type Aggregator: AggregatorTdim::TBinnedBins>; + type Aggregator: AggregatorTdim; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } -pub trait IntoBinnedT +pub trait IntoBinnedT where SK: BinnedStreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { - type StreamOut: Stream< - Item = Result::TBinnedBins>>, Error>, - >; - fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; + fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream; } -impl IntoBinnedT for S +impl IntoBinnedT for S where SK: BinnedStreamKind, - S: Stream>, Error>> + Unpin, - I: AggregatableTdim + Unpin, - I::Aggregator: Unpin, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { - type StreamOut = IntoBinnedTDefaultStream; - - fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { + fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream { IntoBinnedTDefaultStream::new(self, spec) } } -pub struct IntoBinnedTDefaultStream +pub struct IntoBinnedTDefaultStream where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim, SK: BinnedStreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { inp: S, - aggtor: Option, + aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, spec: BinnedRange, curbin: u32, inp_completed: bool, all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - left: Option>, Error>>>>, + left: + Option::XBinnedEvents>>, Error>>>>, errored: bool, completed: bool, - tmp_agg_results: VecDeque<>::OutputValue>, + tmp_agg_results: VecDeque<::TBinnedBins>, _marker: std::marker::PhantomData, } -impl IntoBinnedTDefaultStream +impl IntoBinnedTDefaultStream where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim, SK: BinnedStreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); Self { inp, - aggtor: Some(I::aggregator_new_static(range.beg, range.end)), + aggtor: Some( + <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( + range.beg, range.end, + ), + ), spec, curbin: 0, inp_completed: false, @@ -102,7 +98,10 @@ where } } - fn cur(&mut self, cx: &mut Context) -> Poll>, Error>>> { + fn cur( + &mut self, + cx: &mut Context, + ) -> Poll::XBinnedEvents>>, Error>>> { if let Some(cur) = self.left.take() { cur } else if self.inp_completed { @@ -118,11 +117,16 @@ where let range = self.spec.get_range(self.curbin); let ret = self .aggtor - .replace(I::aggregator_new_static(range.beg, range.end)) + .replace( + <::XBinnedEvents as AggregatableTdim>::aggregator_new_static( + range.beg, range.end, + ), + ) // TODO handle None case, or remove Option if Agg is always present .unwrap() .result(); - self.tmp_agg_results = ret.into(); + //self.tmp_agg_results = VecDeque::from(ret); + self.tmp_agg_results = VecDeque::new(); if self.curbin >= self.spec.count as u32 { self.all_bins_emitted = true; } @@ -130,12 +134,9 @@ where fn handle( &mut self, - cur: Poll>, Error>>>, - ) -> Option< - Poll< - Option>::OutputValue>>, Error>>, - >, - > { + cur: Poll::XBinnedEvents>>, Error>>>, + ) -> Option::TBinnedBins>>, Error>>>> + { use Poll::*; match cur { Ready(Some(Ok(item))) => match item { @@ -153,9 +154,11 @@ where None } else { let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&item) { + //if ag.ends_before(&item) { + if ag.ends_before(err::todoval()) { None - } else if ag.starts_after(&item) { + //} else if ag.starts_after(&item) { + } else if ag.starts_after(err::todoval()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); @@ -163,9 +166,11 @@ where None } else { let mut item = item; - ag.ingest(&mut item); + //ag.ingest(&mut item); + ag.ingest(err::todoval()); let item = item; - if ag.ends_after(&item) { + //if ag.ends_after(&item) { + if ag.ends_after(err::todoval()) { self.left = Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); self.cycle_current_bin(); @@ -196,14 +201,11 @@ where } } -impl Stream for IntoBinnedTDefaultStream +impl Stream for IntoBinnedTDefaultStream where - S: Stream>, Error>> + Unpin, - I: AggregatableTdim + Unpin, - I::Aggregator: Unpin, SK: BinnedStreamKind, + S: Stream::XBinnedEvents>>, Error>> + Unpin, { - //type Item = Result::OutputValue>>, Error>; type Item = Result::TBinnedBins>>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/disk/src/agg/binnedt3.rs b/disk/src/agg/binnedt3.rs new file mode 100644 index 0000000..83a0e1d --- /dev/null +++ b/disk/src/agg/binnedt3.rs @@ -0,0 +1,101 @@ +use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::streams::StreamItem; +use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use err::Error; +use futures_core::Stream; +use netpod::BinnedRange; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub trait Aggregator3Tdim { + type InputValue; + type OutputValue; +} + +pub struct Agg3 {} + +impl Aggregator3Tdim for Agg3 { + type InputValue = MinMaxAvgScalarEventBatch; + type OutputValue = MinMaxAvgScalarBinBatch; +} + +pub struct BinnedT3Stream { + // TODO get rid of box: + inp: Pin + Send>>, + //aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, + aggtor: Option<()>, + spec: BinnedRange, + curbin: u32, + inp_completed: bool, + all_bins_emitted: bool, + range_complete_observed: bool, + range_complete_emitted: bool, + //left: Option::XBinnedEvents>>, Error>>>>, + left: Option<()>, + errored: bool, + completed: bool, + tmp_agg_results: VecDeque, +} + +impl BinnedT3Stream { + pub fn new(inp: S, spec: BinnedRange) -> Self + where + S: Stream + Send + 'static, + { + // TODO simplify here, get rid of numeric parameter: + let range = spec.get_range(0); + Self { + inp: Box::pin(inp), + aggtor: None, + spec, + curbin: 0, + inp_completed: false, + all_bins_emitted: false, + range_complete_observed: false, + range_complete_emitted: false, + left: None, + errored: false, + completed: false, + tmp_agg_results: VecDeque::new(), + } + } +} + +impl Stream for BinnedT3Stream { + type Item = Result>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if self.completed { + panic!("IntoBinnedTDefaultStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.tmp_agg_results.pop_front() { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))) + } else if self.range_complete_emitted { + self.completed = true; + Ready(None) + } else if self.inp_completed && self.all_bins_emitted { + self.range_complete_emitted = true; + if self.range_complete_observed { + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete)))) + } else { + continue 'outer; + } + } else { + err::todo(); + Pending + // TODO `cur` and `handle` are not yet taken over from binnedt.rs + /*let cur = self.cur(cx); + match self.handle(cur) { + Some(item) => item, + None => continue 'outer, + }*/ + }; + } + } +} diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index eff5224..779ff67 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -232,6 +232,12 @@ where } else { self.sum / self.sumc as f32 }; + + // TODO impl problem: + // The return type of this function must be the concrete type that I implement for. + // Otherwise I have no chance building that values. + // I must somehow differently couple that to the SK. + let v = MinMaxAvgScalarBinBatch { ts1s: vec![self.ts1], ts2s: vec![self.ts2], diff --git a/disk/src/agg/streams.rs b/disk/src/agg/streams.rs index d1ffcb7..81d904b 100644 --- a/disk/src/agg/streams.rs +++ b/disk/src/agg/streams.rs @@ -36,103 +36,3 @@ pub trait ToJsonResult { type Output; fn to_json_result(&self) -> Result; } - -impl AggregatableXdim1Bin for StreamItem -where - SK: BinnedStreamKind, - T: AggregatableTdim + AggregatableXdim1Bin, -{ - type Output = StreamItem<>::Output>; - - fn into_agg(self) -> Self::Output { - match self { - Self::Log(item) => Self::Output::Log(item), - Self::Stats(item) => Self::Output::Stats(item), - Self::DataItem(item) => Self::Output::DataItem(item.into_agg()), - } - } -} - -pub struct StreamItemAggregator -where - T: AggregatableTdim, - SK: BinnedStreamKind, -{ - inner_agg: >::Aggregator, -} - -impl StreamItemAggregator -where - T: AggregatableTdim, - SK: BinnedStreamKind, -{ - pub fn new(ts1: u64, ts2: u64) -> Self { - Self { - inner_agg: >::aggregator_new_static(ts1, ts2), - } - } -} - -impl AggregatorTdim for StreamItemAggregator -where - T: AggregatableTdim, - SK: BinnedStreamKind, -{ - type InputValue = StreamItem; - type OutputValue = StreamItem<<>::Aggregator as AggregatorTdim>::OutputValue>; - - fn ends_before(&self, inp: &Self::InputValue) -> bool { - match inp { - StreamItem::Log(_) => false, - StreamItem::Stats(_) => false, - StreamItem::DataItem(item) => self.inner_agg.ends_before(item), - } - } - - fn ends_after(&self, inp: &Self::InputValue) -> bool { - match inp { - StreamItem::Log(_) => false, - StreamItem::Stats(_) => false, - StreamItem::DataItem(item) => self.inner_agg.ends_after(item), - } - } - - fn starts_after(&self, inp: &Self::InputValue) -> bool { - match inp { - StreamItem::Log(_) => false, - StreamItem::Stats(_) => false, - StreamItem::DataItem(item) => self.inner_agg.starts_after(item), - } - } - - fn ingest(&mut self, inp: &mut Self::InputValue) { - match inp { - StreamItem::Log(_) => {} - StreamItem::Stats(_) => {} - StreamItem::DataItem(item) => { - self.inner_agg.ingest(item); - } - } - } - - fn result(self) -> Vec { - self.inner_agg - .result() - .into_iter() - .map(|k| StreamItem::DataItem(k)) - .collect() - } -} - -impl AggregatableTdim for StreamItem -where - T: AggregatableTdim, - SK: BinnedStreamKind, -{ - //type Output = StreamItem< as AggregatorTdim>::OutputValue>; - type Aggregator = StreamItemAggregator; - - fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { - Self::Aggregator::new(ts1, ts2) - } -} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 790daf9..46ea031 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -68,7 +68,7 @@ async fn agg_x_dim_0_inner() { ); let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1); - let fut1 = IntoBinnedT::::into_binned_t( + let fut1 = IntoBinnedT::::into_binned_t( fut1, BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), ); @@ -138,7 +138,7 @@ async fn agg_x_dim_1_inner() { //info!("after X binning {:?}", k.as_ref().unwrap()); k }); - let fut1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t( + let fut1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t( fut1, BinnedRange::covering_range(range, bin_count).unwrap().unwrap(), ); diff --git a/disk/src/binned.rs b/disk/src/binned.rs index da523da..38b903e 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,5 +1,6 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim, IntoBinnedT}; use crate::agg::binnedt2::AggregatableTdim2; +use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::agg::streams::{Collectable, Collected, StreamItem, ToJsonResult}; @@ -77,6 +78,9 @@ impl Collected for MinMaxAvgScalarBinBatchCollected { pub struct MinMaxAvgScalarBinBatchCollectedJsonResult { ts_bin_edges: Vec, counts: Vec, + mins: Vec, + maxs: Vec, + avgs: Vec, #[serde(skip_serializing_if = "Bool::is_false")] finalised_range: bool, #[serde(skip_serializing_if = "Zero::is_zero")] @@ -108,6 +112,9 @@ impl ToJsonResult for MinMaxAvgScalarBinBatchCollected { }; let ret = MinMaxAvgScalarBinBatchCollectedJsonResult { counts: self.batch.counts.clone(), + mins: self.batch.mins.clone(), + maxs: self.batch.maxs.clone(), + avgs: self.batch.avgs.clone(), missing_bins: self.bin_count_exp - self.batch.ts1s.len() as u32, finalised_range: self.finalised_range, ts_bin_edges: tsa, @@ -531,6 +538,8 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static + 'static; type XBinnedEvents: XBinnedEvents; type TBinnedBins: TBinnedBins; + type XBinnedToTBinnedAggregator; + type XBinnedToTBinnedStream; fn new_binned_from_prebinned( &self, @@ -547,6 +556,11 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static range: BinnedRange, node_config: &NodeConfigCached, ) -> Result; + + fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream +/*where + S: Stream::XBinnedEvents>>, Error>> + + Unpin*/; } #[derive(Clone)] @@ -578,6 +592,8 @@ impl BinnedStreamKind for BinnedStreamKindScalar { type TBinnedStreamType = BoxedStream>, Error>>; type XBinnedEvents = MinMaxAvgScalarEventBatch; type TBinnedBins = MinMaxAvgScalarBinBatch; + type XBinnedToTBinnedAggregator = Agg3; + type XBinnedToTBinnedStream = BinnedT3Stream; fn new_binned_from_prebinned( &self, @@ -607,10 +623,14 @@ impl BinnedStreamKind for BinnedStreamKindScalar { node_config: &NodeConfigCached, ) -> Result { let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); - // TODO use the binned2 instead - let s = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s, range); + let s = Self::xbinned_to_tbinned(s, range); + //let s = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s, range); Ok(BoxedStream::new(Box::pin(s))?) } + + fn xbinned_to_tbinned(inp: S, spec: BinnedRange) -> Self::XBinnedToTBinnedStream { + err::todoval() + } } // TODO this code is needed somewhere: diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index ee9342e..a5463d2 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -167,7 +167,7 @@ where self.node_config.node_config.cluster.clone(), self.stream_kind.clone(), ); - let s1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s1, range); + let s1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s1, range); //self.fut2 = Some(Box::pin(s1)); self.fut2 = err::todoval(); } diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index f816f84..f4ebf1b 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -66,7 +66,7 @@ where >, >( &frame, - <::XBinnedEvents as XBinnedEvents>::frame_type(), + <::XBinnedEvents as XBinnedEvents>::frame_type(), ) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))),