WIP on T agg

This commit is contained in:
Dominik Werder
2021-04-07 22:08:53 +02:00
parent 9f239bd930
commit 8458e28eda

View File

@@ -12,6 +12,7 @@ pub trait AggregatorTdim {
type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim;
fn ingest(&mut self, v: &Self::InputValue);
fn result(self) -> Self::OutputValue;
}
pub trait AggregatableXdim1Bin {
@@ -21,7 +22,7 @@ pub trait AggregatableXdim1Bin {
pub trait AggregatableTdim {
type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim;
type Aggregator: AggregatorTdim<InputValue = Self>;
fn aggregator_new(&self) -> Self::Aggregator;
}
@@ -42,6 +43,7 @@ impl AggregatorTdim for () {
type InputValue = ();
type OutputValue = ();
fn ingest(&mut self, v: &Self::InputValue) { todo!() }
fn result(self) -> Self::OutputValue { todo!() }
}
@@ -138,6 +140,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue { todo!() }
}
@@ -174,6 +179,8 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
todo!()
}
fn result(self) -> Self::OutputValue { todo!() }
}
@@ -209,6 +216,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue { todo!() }
}
@@ -357,8 +367,10 @@ impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<I
}
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin {
type Item = Result<MinMaxAvgScalarBinSingle, Error>;
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I>
where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin
{
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -367,8 +379,9 @@ impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> where I: AggregatableTdim,
if self.aggtor.is_none() {
self.aggtor = Some(k.aggregator_new());
}
let ag = &mut self.aggtor.as_mut().unwrap();
todo!()
let ag = self.aggtor.as_mut().unwrap();
ag.ingest(&k);
Ready(Some(Ok(self.aggtor.take().unwrap().result())))
}
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),