From 8458e28eda692ec6e1e0db2ecee4409aea4d3afa Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 7 Apr 2021 22:08:53 +0200 Subject: [PATCH] WIP on T agg --- disk/src/agg.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index a43e884..9dc3520 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -12,6 +12,7 @@ pub trait AggregatorTdim { type InputValue; type OutputValue: AggregatableXdim1Bin + AggregatableTdim; fn ingest(&mut self, v: &Self::InputValue); + fn result(self) -> Self::OutputValue; } pub trait AggregatableXdim1Bin { @@ -21,7 +22,7 @@ pub trait AggregatableXdim1Bin { pub trait AggregatableTdim { type Output: AggregatableXdim1Bin + AggregatableTdim; - type Aggregator: AggregatorTdim; + type Aggregator: AggregatorTdim; fn aggregator_new(&self) -> Self::Aggregator; } @@ -42,6 +43,7 @@ impl AggregatorTdim for () { type InputValue = (); type OutputValue = (); fn ingest(&mut self, v: &Self::InputValue) { todo!() } + fn result(self) -> Self::OutputValue { todo!() } } @@ -138,6 +140,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { fn ingest(&mut self, v: &Self::InputValue) { todo!() } + + fn result(self) -> Self::OutputValue { todo!() } + } @@ -174,6 +179,8 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { todo!() } + fn result(self) -> Self::OutputValue { todo!() } + } @@ -209,6 +216,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { fn ingest(&mut self, v: &Self::InputValue) { todo!() } + + fn result(self) -> Self::OutputValue { todo!() } + } @@ -357,8 +367,10 @@ impl IntoBinnedTDefaultStream where I: AggregatableTdim, S: Stream Stream for IntoBinnedTDefaultStream where I: AggregatableTdim, T: Stream> + Unpin, I::Aggregator: Unpin { - type Item = Result; +impl Stream for IntoBinnedTDefaultStream +where I: AggregatableTdim, T: Stream> + Unpin, I::Aggregator: Unpin +{ + type Item = Result<::OutputValue, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; @@ -367,8 +379,9 @@ impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim, if self.aggtor.is_none() { self.aggtor = Some(k.aggregator_new()); } - let ag = &mut self.aggtor.as_mut().unwrap(); - todo!() + let ag = self.aggtor.as_mut().unwrap(); + ag.ingest(&k); + Ready(Some(Ok(self.aggtor.take().unwrap().result()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None),