Have to require the OutputValue of AggregatorTdim to be Unpin

This commit is contained in:
Dominik Werder
2021-05-04 17:08:03 +02:00
parent 789f28af3f
commit bf08893a98
5 changed files with 89 additions and 29 deletions

View File

@@ -4,6 +4,7 @@ use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::BinnedRange;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -38,6 +39,7 @@ where
errored: bool,
completed: bool,
inp_completed: bool,
tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim>::OutputValue>,
}
impl<S, I> IntoBinnedTDefaultStream<S, I>
@@ -56,14 +58,15 @@ where
errored: false,
completed: false,
inp_completed: false,
tmp_agg_results: VecDeque::new(),
}
}
}
impl<T, I> Stream for IntoBinnedTDefaultStream<T, I>
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
where
I: AggregatableTdim + Unpin,
T: Stream<Item = Result<I, Error>> + Unpin,
S: Stream<Item = Result<I, Error>> + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
@@ -78,6 +81,9 @@ where
return Ready(None);
}
'outer: loop {
if let Some(item) = self.tmp_agg_results.pop_front() {
return Ready(Some(Ok(item)));
}
let cur = if let Some(k) = self.left.take() {
k
} else if self.inp_completed {
@@ -102,7 +108,9 @@ where
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
Ready(Some(Ok(ret)))
//Ready(Some(Ok(ret)))
self.tmp_agg_results = ret.into();
continue 'outer;
} else {
//info!("INGEST");
let mut k = k;
@@ -119,7 +127,9 @@ where
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
Ready(Some(Ok(ret)))
//Ready(Some(Ok(ret)))
self.tmp_agg_results = ret.into();
continue 'outer;
} else {
//info!("ENDS WITHIN");
continue 'outer;
@@ -140,7 +150,12 @@ where
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) {
Some(ag) => Ready(Some(Ok(ag.result()))),
Some(ag) => {
let ret = ag.result();
//Ready(Some(Ok(ag.result())))
self.tmp_agg_results = ret.into();
continue 'outer;
}
None => {
panic!();
}

View File

@@ -250,7 +250,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
}
}
fn result(mut self) -> Self::OutputValue {
fn result(mut self) -> Vec<Self::OutputValue> {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
@@ -258,7 +258,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} else {
self.sum / self.count as f32
};
MinMaxAvgScalarBinBatch {
let v = MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],
ts2s: vec![self.ts2],
counts: vec![self.count],
@@ -268,6 +268,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
}
};
vec![v]
}
}

View File

@@ -280,7 +280,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
}
}
fn result(mut self) -> Self::OutputValue {
fn result(mut self) -> Vec<Self::OutputValue> {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.sumc == 0 {
@@ -288,7 +288,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
} else {
self.sum / self.sumc as f32
};
MinMaxAvgScalarBinBatch {
let v = MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],
ts2s: vec![self.ts2],
counts: vec![self.count],
@@ -298,6 +298,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
}
};
vec![v]
}
}