WIP on merger

This commit is contained in:
Dominik Werder
2021-05-05 14:44:08 +02:00
parent e80a6c1688
commit 226c2ac6f3
4 changed files with 199 additions and 129 deletions

View File

@@ -18,11 +18,12 @@ pub trait AggregatorTdim {
fn result(self) -> Vec<Self::OutputValue>;
}
pub trait AggregatableTdim {
pub trait AggregatableTdim: Sized {
type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<InputValue = Self>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
fn is_range_complete(&self) -> bool;
fn make_range_complete_item() -> Option<Self>;
}
pub trait IntoBinnedT {
@@ -52,6 +53,8 @@ where
aggtor: Option<I::Aggregator>,
spec: BinnedRange,
curbin: u32,
range_complete: bool,
range_complete_emitted: bool,
left: Option<Poll<Option<Result<I, Error>>>>,
errored: bool,
completed: bool,
@@ -71,6 +74,8 @@ where
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
spec,
curbin: 0,
range_complete: false,
range_complete_emitted: false,
left: None,
errored: false,
completed: false,
@@ -104,42 +109,35 @@ where
let cur = if let Some(k) = self.left.take() {
k
} else if self.inp_completed {
Ready(None)
if self.range_complete {
self.range_complete_emitted = true;
// TODO why can't I declare that type?
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
if let Some(k) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
return Ready(Some(Ok(k)));
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one");
Ready(None)
}
} else {
Ready(None)
}
} else {
let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll");
inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx))
};
break match cur {
Ready(Some(Ok(k))) => {
// TODO need some trait to know whether the incoming item is a RangeComplete
err::todo();
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {
//info!("ENDS BEFORE");
continue 'outer;
} else if ag.starts_after(&k) {
//info!("STARTS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
//Ready(Some(Ok(ret)))
self.tmp_agg_results = ret.into();
if k.is_range_complete() {
self.range_complete = true;
continue 'outer;
} else {
//info!("INGEST");
let mut k = k;
ag.ingest(&mut k);
// if this input contains also data after the current bin, then I need to keep
// it for the next round.
if ag.ends_after(&k) {
//info!("ENDS AFTER");
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {
//info!("ENDS BEFORE");
continue 'outer;
} else if ag.starts_after(&k) {
//info!("STARTS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
@@ -152,8 +150,28 @@ where
self.tmp_agg_results = ret.into();
continue 'outer;
} else {
//info!("ENDS WITHIN");
continue 'outer;
//info!("INGEST");
let mut k = k;
ag.ingest(&mut k);
// if this input contains also data after the current bin, then I need to keep
// it for the next round.
if ag.ends_after(&k) {
//info!("ENDS AFTER");
self.left = Some(Ready(Some(Ok(k))));
self.curbin += 1;
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(I::aggregator_new_static(range.beg, range.end))
.unwrap()
.result();
//Ready(Some(Ok(ret)))
self.tmp_agg_results = ret.into();
continue 'outer;
} else {
//info!("ENDS WITHIN");
continue 'outer;
}
}
}
}

View File

@@ -108,12 +108,18 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch {
impl AggregatableTdim for MinMaxAvgScalarEventBatch {
type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarEventBatchAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2)
}
fn is_range_complete(&self) -> bool {
false
}
fn make_range_complete_item() -> Option<Self> {
None
}
}
impl MinMaxAvgScalarEventBatch {
@@ -286,6 +292,10 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
false
}
}
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)
}
}
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {

View File

@@ -197,6 +197,10 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch {
fn is_range_complete(&self) -> bool {
false
}
fn make_range_complete_item() -> Option<Self> {
None
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {
@@ -308,6 +312,10 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
false
}
}
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {

View File

@@ -161,12 +161,13 @@ where
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
data_emit_complete: bool,
batch_size: usize,
}
impl<S> MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>>,
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>> + Unpin,
{
pub fn new(inps: Vec<S>) -> Self {
let n = inps.len();
@@ -185,42 +186,23 @@ where
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
data_emit_complete: false,
batch_size: 64,
}
}
}
impl<S> Stream for MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>> + Unpin,
{
type Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// This can:
// Do nothing if all have Val or Finished.
// But if some is None:
// We might get some Pending from upstream. In that case, caller also wants to abort here.
fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Error>> {
use Poll::*;
'outer: loop {
if self.completed {
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
if self.range_complete_observed_all {
error!("MERGER NOTE range_complete_observed_all");
if self.range_complete_observed_all_emitted {
// TODO something to do? Returning None is maybe too early if there is stats left.
} else {
error!("MERGER NOTE range_complete_observed_all EMIT NOW");
self.range_complete_observed_all_emitted = true;
return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)));
}
}
// can only run logic if all streams are either finished, errored or have some current value.
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::None => {
match self.inps[i1].poll_next_unpin(cx) {
let mut pending = 0;
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::None => {
'l1: loop {
break match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => match k {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => {
self.ixs[i1] = 0;
@@ -235,9 +217,10 @@ where
} else {
info!("\n\n:::::: range_complete d {}", d);
}
continue 'outer;
continue 'l1;
}
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => {
// TODO merge also the stats: either just sum, or sum up by input index.
todo!();
}
},
@@ -245,86 +228,137 @@ where
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
self.errored = true;
return Ready(Some(Err(e)));
return Ready(Err(e));
}
Ready(None) => {
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish;
}
Pending => {
return Pending;
pending += 1;
}
}
};
}
_ => (),
}
_ => (),
}
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
match &self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::Finish => {}
MergedMinMaxAvgScalarStreamCurVal::Val(val) => {
let u = self.ixs[i1];
if u >= val.tss.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
continue 'outer;
} else {
let ts = val.tss[u];
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
if pending > 0 {
Pending
} else {
Ready(Ok(()))
}
}
}
impl<S> Stream for MergedMinMaxAvgScalarStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>> + Unpin,
{
type Item = Result<MinMaxAvgScalarEventBatchStreamItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
if self.completed {
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
}
if self.errored {
self.completed = true;
return Ready(None);
}
'outer: loop {
break if self.data_emit_complete {
error!("MERGER NOTE data_emit_complete");
if self.range_complete_observed_all {
error!("MERGER NOTE range_complete_observed_all");
if self.range_complete_observed_all_emitted {
error!("MERGER NOTE range_complete_observed_all_emitted");
// NOTE everything else (data and stats) must be emitted before data_emit_complete gets set.
self.completed = true;
Ready(None)
} else {
error!("MERGER NOTE range_complete_observed_all EMIT NOW");
self.range_complete_observed_all_emitted = true;
// NOTE this is supposed to return
Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)))
}
_ => panic!(),
}
}
if lowest_ix == usize::MAX {
if self.batch.tss.len() != 0 {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all {
// TODO we don't want to emit range complete here, instead we want to emit potentially
// a RangeComplete at the very end when data and stats are emitted and all inputs finished.
err::todo();
}
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))");
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
break Ready(Some(Ok(ret)));
} else {
if self.range_complete_observed_all {
// TODO we don't want to emit range complete here, instead we want to emit potentially
// a RangeComplete at the very end when data and stats are emitted and all inputs finished.
err::todo();
}
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)");
self.completed = true;
break Ready(None);
Ready(None)
}
} else {
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
assert!(lowest_ts >= self.ts_last_emit);
self.ts_last_emit = lowest_ts;
self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix];
let z = match &self.current[lowest_ix] {
MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()),
_ => panic!(),
};
self.batch.mins.push(z.0);
self.batch.maxs.push(z.1);
self.batch.avgs.push(z.2);
self.ixs[lowest_ix] += 1;
if self.ixs[lowest_ix] >= z.3 {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None;
// Can only run logic if all streams are either finished, errored or have some current value.
match self.replenish(cx) {
Ready(Ok(_)) => {
let mut lowest_ix = usize::MAX;
let mut lowest_ts = u64::MAX;
for i1 in 0..self.inps.len() {
match &self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::Finish => {}
MergedMinMaxAvgScalarStreamCurVal::Val(val) => {
let u = self.ixs[i1];
if u >= val.tss.len() {
self.ixs[i1] = 0;
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None;
continue 'outer;
} else {
let ts = val.tss[u];
if ts < lowest_ts {
lowest_ix = i1;
lowest_ts = ts;
}
}
}
_ => panic!(),
}
}
if lowest_ix == usize::MAX {
if self.batch.tss.len() != 0 {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))");
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
self.data_emit_complete = true;
Ready(Some(Ok(ret)))
} else {
info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)");
self.data_emit_complete = true;
continue 'outer;
}
} else {
//trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix);
assert!(lowest_ts >= self.ts_last_emit);
self.ts_last_emit = lowest_ts;
self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix];
let z = match &self.current[lowest_ix] {
MergedMinMaxAvgScalarStreamCurVal::Val(k) => {
(k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len())
}
_ => panic!(),
};
self.batch.mins.push(z.0);
self.batch.maxs.push(z.1);
self.batch.avgs.push(z.2);
self.ixs[lowest_ix] += 1;
if self.ixs[lowest_ix] >= z.3 {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None;
}
if self.batch.tss.len() >= self.batch_size {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
Ready(Some(Ok(ret)))
} else {
continue 'outer;
}
}
}
Ready(Err(e)) => {
self.errored = true;
Ready(Some(Err(e)))
}
Pending => Pending,
}
}
if self.batch.tss.len() >= self.batch_size {
let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
return Ready(Some(Ok(ret)));
}
};
}
}
}