diff --git a/disk/src/agg.rs b/disk/src/agg.rs index e442a5f..a43e884 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -9,7 +9,9 @@ use futures_util::{pin_mut, StreamExt, future::ready}; use netpod::ScalarType; pub trait AggregatorTdim { + type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; + fn ingest(&mut self, v: &Self::InputValue); } pub trait AggregatableXdim1Bin { @@ -37,7 +39,9 @@ impl AggregatableTdim for () { } } impl AggregatorTdim for () { + type InputValue = (); type OutputValue = (); + fn ingest(&mut self, v: &Self::InputValue) { todo!() } } @@ -128,7 +132,12 @@ impl MinMaxAvgScalarEventBatchAggregator { } impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { + type InputValue = MinMaxAvgScalarEventBatch; type OutputValue = MinMaxAvgScalarBinBatch; + + fn ingest(&mut self, v: &Self::InputValue) { + todo!() + } } @@ -149,12 +158,25 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch { type Output = MinMaxAvgScalarBinSingle; - type Aggregator = MinMaxAvgScalarBinSingle; + type Aggregator = MinMaxAvgScalarBinBatchAggregator; fn aggregator_new(&self) -> Self::Aggregator { todo!() } } +pub struct MinMaxAvgScalarBinBatchAggregator {} + +impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { + type InputValue = MinMaxAvgScalarBinBatch; + type OutputValue = MinMaxAvgScalarBinSingle; + + fn ingest(&mut self, v: &Self::InputValue) { + todo!() + } + +} + + pub struct MinMaxAvgScalarBinSingle { ts1: u64, ts2: u64, @@ -165,16 +187,12 @@ pub struct MinMaxAvgScalarBinSingle { impl AggregatableTdim for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; - type Aggregator = MinMaxAvgScalarBinSingle; + type Aggregator = MinMaxAvgScalarBinSingleAggregator; fn aggregator_new(&self) -> Self::Aggregator { todo!() } } -impl AggregatorTdim for MinMaxAvgScalarBinSingle { - type OutputValue = (); -} - impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { type Output = MinMaxAvgScalarBinSingle; fn into_agg(self) -> Self::Output { @@ -182,6 +200,17 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { } } +pub struct MinMaxAvgScalarBinSingleAggregator {} + +impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { + type InputValue = MinMaxAvgScalarBinSingle; + type OutputValue = MinMaxAvgScalarBinSingle; + + fn ingest(&mut self, v: &Self::InputValue) { + todo!() + } +} + @@ -303,8 +332,7 @@ pub trait IntoBinnedT { fn into_binned_t(self) -> Self::StreamOut; } -impl IntoBinnedT for T where I: AggregatableTdim, T: Stream> + Unpin { - //type Bla = <::Aggregator as AggregatorTdim>::OutputValue; +impl IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I::Aggregator: Unpin { type StreamOut = IntoBinnedTDefaultStream; fn into_binned_t(self) -> Self::StreamOut { @@ -315,6 +343,7 @@ impl IntoBinnedT for T where I: AggregatableTdim, T: Stream where I: AggregatableTdim, S: Stream> { inp: S, + aggtor: Option, } impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream> { @@ -322,18 +351,23 @@ impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream Self { Self { inp: inp, + aggtor: None, } } } -impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim, T: Stream> + Unpin { +impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim, T: Stream> + Unpin, I::Aggregator: Unpin { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { + if self.aggtor.is_none() { + self.aggtor = Some(k.aggregator_new()); + } + let ag = &mut self.aggtor.as_mut().unwrap(); todo!() } Ready(Some(Err(e))) => Ready(Some(Err(e))),