This commit is contained in:
Dominik Werder
2021-04-20 18:09:32 +02:00
parent 6801d7219a
commit 6d82e6e8d4
4 changed files with 215 additions and 29 deletions

View File

@@ -1,4 +1,4 @@
use crate::agg::{Dim1F32Stream, ValuesDim1};
use crate::agg::{Dim1F32Stream, MinMaxAvgScalarEventBatch, ValuesDim1};
use crate::EventFull;
use err::Error;
use futures_core::Stream;
@@ -130,3 +130,129 @@ enum CurVal {
Err(Error),
Val(ValuesDim1),
}
/*
============== MergedMinMaxAvgScalarStream
*/
pub struct MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>>,
{
inps: Vec<S>,
current: Vec<MergedMinMaxAvgScalarStreamCurVal>,
ixs: Vec<usize>,
emitted_complete: bool,
batch: MinMaxAvgScalarEventBatch,
}
impl<S> MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>>,
{
pub fn new(inps: Vec<S>) -> Self {
let n = inps.len();
let current = (0..n)
.into_iter()
.map(|_k| MergedMinMaxAvgScalarStreamCurVal::None)
.collect();
Self {
inps,
current: current,
ixs: vec![0; n],
emitted_complete: false,
batch: MinMaxAvgScalarEventBatch::empty(),
}
}
}
impl<S> Stream for MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatch, Error>> + Unpin,
{
type Item = Result<MinMaxAvgScalarEventBatch, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
if self.emitted_complete {
panic!("poll on complete stream");
}
// can only run logic if all streams are either finished, errored or have some current value.
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::None => {
match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(k);
}
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
return Ready(Some(Err(e)));
}
Ready(None) => {
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish;
}
Pending => {
// TODO is this behaviour correct?
return Pending;
}
}
}
_ => (),
}
}
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
match &self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::Finish => {}
MergedMinMaxAvgScalarStreamCurVal::Val(val) => {
let u = self.ixs[i1];
if u >= val.tss.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
continue 'outer;
} else {
let ts = val.tss[u];
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
_ => panic!(),
}
}
if lowest_ix == usize::MAX {
// TODO all inputs in finished state
break Ready(None);
} else {
info!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix];
let z = match &self.current[lowest_ix] {
MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix]),
_ => panic!(),
};
self.batch.mins.push(z.0);
self.batch.maxs.push(z.1);
self.batch.avgs.push(z.2);
self.ixs[lowest_ix] += 1;
}
if self.batch.tss.len() >= 64 {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
break Ready(Some(Ok(k)));
}
}
}
}
enum MergedMinMaxAvgScalarStreamCurVal {
None,
Finish,
Val(MinMaxAvgScalarEventBatch),
}