This commit is contained in:
Dominik Werder
2021-04-07 21:43:34 +02:00
parent 93999c53bb
commit 9f239bd930
+43 -9
View File
@@ -9,7 +9,9 @@ use futures_util::{pin_mut, StreamExt, future::ready};
use netpod::ScalarType; use netpod::ScalarType;
pub trait AggregatorTdim { pub trait AggregatorTdim {
type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim; type OutputValue: AggregatableXdim1Bin + AggregatableTdim;
fn ingest(&mut self, v: &Self::InputValue);
} }
pub trait AggregatableXdim1Bin { pub trait AggregatableXdim1Bin {
@@ -37,7 +39,9 @@ impl AggregatableTdim for () {
} }
} }
impl AggregatorTdim for () { impl AggregatorTdim for () {
type InputValue = ();
type OutputValue = (); type OutputValue = ();
fn ingest(&mut self, v: &Self::InputValue) { todo!() }
} }
@@ -128,7 +132,12 @@ impl MinMaxAvgScalarEventBatchAggregator {
} }
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinBatch;
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
} }
@@ -149,12 +158,25 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
impl AggregatableTdim for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinSingle; type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new(&self) -> Self::Aggregator { fn aggregator_new(&self) -> Self::Aggregator {
todo!() todo!()
} }
} }
pub struct MinMaxAvgScalarBinBatchAggregator {}
impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
}
pub struct MinMaxAvgScalarBinSingle { pub struct MinMaxAvgScalarBinSingle {
ts1: u64, ts1: u64,
ts2: u64, ts2: u64,
@@ -165,16 +187,12 @@ pub struct MinMaxAvgScalarBinSingle {
impl AggregatableTdim for MinMaxAvgScalarBinSingle { impl AggregatableTdim for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle; type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinSingle; type Aggregator = MinMaxAvgScalarBinSingleAggregator;
fn aggregator_new(&self) -> Self::Aggregator { fn aggregator_new(&self) -> Self::Aggregator {
todo!() todo!()
} }
} }
impl AggregatorTdim for MinMaxAvgScalarBinSingle {
type OutputValue = ();
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle; type Output = MinMaxAvgScalarBinSingle;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
@@ -182,6 +200,17 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle {
} }
} }
pub struct MinMaxAvgScalarBinSingleAggregator {}
impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
type InputValue = MinMaxAvgScalarBinSingle;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ingest(&mut self, v: &Self::InputValue) {
todo!()
}
}
@@ -303,8 +332,7 @@ pub trait IntoBinnedT {
fn into_binned_t(self) -> Self::StreamOut; fn into_binned_t(self) -> Self::StreamOut;
} }
impl<T, I> IntoBinnedT for T where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin { impl<T, I> IntoBinnedT for T where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin {
//type Bla = <<I as AggregatableTdim>::Aggregator as AggregatorTdim>::OutputValue;
type StreamOut = IntoBinnedTDefaultStream<T, I>; type StreamOut = IntoBinnedTDefaultStream<T, I>;
fn into_binned_t(self) -> Self::StreamOut { fn into_binned_t(self) -> Self::StreamOut {
@@ -315,6 +343,7 @@ impl<T, I> IntoBinnedT for T where I: AggregatableTdim, T: Stream<Item=Result<I,
pub struct IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> { pub struct IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> {
inp: S, inp: S,
aggtor: Option<I::Aggregator>,
} }
impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> { impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<Item=Result<I, Error>> {
@@ -322,18 +351,23 @@ impl<S, I> IntoBinnedTDefaultStream<S, I> where I: AggregatableTdim, S: Stream<I
pub fn new(inp: S) -> Self { pub fn new(inp: S) -> Self {
Self { Self {
inp: inp, inp: inp,
aggtor: None,
} }
} }
} }
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin { impl<T, I> Stream for IntoBinnedTDefaultStream<T, I> where I: AggregatableTdim, T: Stream<Item=Result<I, Error>> + Unpin, I::Aggregator: Unpin {
type Item = Result<MinMaxAvgScalarBinSingle, Error>; type Item = Result<MinMaxAvgScalarBinSingle, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
match self.inp.poll_next_unpin(cx) { match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => { Ready(Some(Ok(k))) => {
if self.aggtor.is_none() {
self.aggtor = Some(k.aggregator_new());
}
let ag = &mut self.aggtor.as_mut().unwrap();
todo!() todo!()
} }
Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(Some(Err(e))) => Ready(Some(Err(e))),