From 226c2ac6f311f3da6c9c00715ae06a04bf0ce984 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 5 May 2021 14:44:08 +0200 Subject: [PATCH] WIP on merger --- disk/src/agg/binnedt.rs | 80 +++++++----- disk/src/agg/eventbatch.rs | 10 ++ disk/src/agg/scalarbinbatch.rs | 8 ++ disk/src/merge.rs | 230 +++++++++++++++++++-------------- 4 files changed, 199 insertions(+), 129 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 01afab0..0d5aff6 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -18,11 +18,12 @@ pub trait AggregatorTdim { fn result(self) -> Vec; } -pub trait AggregatableTdim { +pub trait AggregatableTdim: Sized { type Output: AggregatableXdim1Bin + AggregatableTdim; type Aggregator: AggregatorTdim; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; fn is_range_complete(&self) -> bool; + fn make_range_complete_item() -> Option; } pub trait IntoBinnedT { @@ -52,6 +53,8 @@ where aggtor: Option, spec: BinnedRange, curbin: u32, + range_complete: bool, + range_complete_emitted: bool, left: Option>>>, errored: bool, completed: bool, @@ -71,6 +74,8 @@ where aggtor: Some(I::aggregator_new_static(range.beg, range.end)), spec, curbin: 0, + range_complete: false, + range_complete_emitted: false, left: None, errored: false, completed: false, @@ -104,42 +109,35 @@ where let cur = if let Some(k) = self.left.take() { k } else if self.inp_completed { - Ready(None) + if self.range_complete { + self.range_complete_emitted = true; + // TODO why can't I declare that type? + //type TT = ::OutputValue; + if let Some(k) = ::OutputValue::make_range_complete_item() { + return Ready(Some(Ok(k))); + } else { + warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); + Ready(None) + } + } else { + Ready(None) + } } else { let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) }; break match cur { Ready(Some(Ok(k))) => { - // TODO need some trait to know whether the incoming item is a RangeComplete - - err::todo(); - - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&k) { - //info!("ENDS BEFORE"); - continue 'outer; - } else if ag.starts_after(&k) { - //info!("STARTS AFTER"); - self.left = Some(Ready(Some(Ok(k)))); - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let ret = self - .aggtor - .replace(I::aggregator_new_static(range.beg, range.end)) - .unwrap() - .result(); - //Ready(Some(Ok(ret))) - self.tmp_agg_results = ret.into(); + if k.is_range_complete() { + self.range_complete = true; continue 'outer; } else { - //info!("INGEST"); - let mut k = k; - ag.ingest(&mut k); - // if this input contains also data after the current bin, then I need to keep - // it for the next round. - if ag.ends_after(&k) { - //info!("ENDS AFTER"); + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&k) { + //info!("ENDS BEFORE"); + continue 'outer; + } else if ag.starts_after(&k) { + //info!("STARTS AFTER"); self.left = Some(Ready(Some(Ok(k)))); self.curbin += 1; let range = self.spec.get_range(self.curbin); @@ -152,8 +150,28 @@ where self.tmp_agg_results = ret.into(); continue 'outer; } else { - //info!("ENDS WITHIN"); - continue 'outer; + //info!("INGEST"); + let mut k = k; + ag.ingest(&mut k); + // if this input contains also data after the current bin, then I need to keep + // it for the next round. + if ag.ends_after(&k) { + //info!("ENDS AFTER"); + self.left = Some(Ready(Some(Ok(k)))); + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(I::aggregator_new_static(range.beg, range.end)) + .unwrap() + .result(); + //Ready(Some(Ok(ret))) + self.tmp_agg_results = ret.into(); + continue 'outer; + } else { + //info!("ENDS WITHIN"); + continue 'outer; + } } } } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 1a2ae27..5c0dfe9 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -108,12 +108,18 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { impl AggregatableTdim for MinMaxAvgScalarEventBatch { type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarEventBatchAggregator; + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { MinMaxAvgScalarEventBatchAggregator::new(ts1, ts2) } + fn is_range_complete(&self) -> bool { false } + + fn make_range_complete_item() -> Option { + None + } } impl MinMaxAvgScalarEventBatch { @@ -286,6 +292,10 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { false } } + + fn make_range_complete_item() -> Option { + Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete) + } } pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 2399b30..ef4b504 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -197,6 +197,10 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { fn is_range_complete(&self) -> bool { false } + + fn make_range_complete_item() -> Option { + None + } } pub struct MinMaxAvgScalarBinBatchAggregator { @@ -308,6 +312,10 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { false } } + + fn make_range_complete_item() -> Option { + Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) + } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 9a4e4fa..1ddbd51 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -161,12 +161,13 @@ where range_complete_observed: Vec, range_complete_observed_all: bool, range_complete_observed_all_emitted: bool, + data_emit_complete: bool, batch_size: usize, } impl MergedMinMaxAvgScalarStream where - S: Stream>, + S: Stream> + Unpin, { pub fn new(inps: Vec) -> Self { let n = inps.len(); @@ -185,42 +186,23 @@ where range_complete_observed: vec![false; n], range_complete_observed_all: false, range_complete_observed_all_emitted: false, + data_emit_complete: false, batch_size: 64, } } -} -impl Stream for MergedMinMaxAvgScalarStream -where - S: Stream> + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // This can: + // Do nothing if all have Val or Finished. + // But if some is None: + // We might get some Pending from upstream. In that case, caller also wants to abort here. + fn replenish(self: &mut Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - 'outer: loop { - if self.completed { - panic!("MergedMinMaxAvgScalarStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - if self.range_complete_observed_all { - error!("MERGER NOTE range_complete_observed_all"); - if self.range_complete_observed_all_emitted { - // TODO something to do? Returning None is maybe too early if there is stats left. - } else { - error!("MERGER NOTE range_complete_observed_all EMIT NOW"); - self.range_complete_observed_all_emitted = true; - return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete))); - } - } - // can only run logic if all streams are either finished, errored or have some current value. - for i1 in 0..self.inps.len() { - match self.current[i1] { - MergedMinMaxAvgScalarStreamCurVal::None => { - match self.inps[i1].poll_next_unpin(cx) { + let mut pending = 0; + for i1 in 0..self.inps.len() { + match self.current[i1] { + MergedMinMaxAvgScalarStreamCurVal::None => { + 'l1: loop { + break match self.inps[i1].poll_next_unpin(cx) { Ready(Some(Ok(k))) => match k { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => { self.ixs[i1] = 0; @@ -235,9 +217,10 @@ where } else { info!("\n\n:::::: range_complete d {}", d); } - continue 'outer; + continue 'l1; } MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => { + // TODO merge also the stats: either just sum, or sum up by input index. todo!(); } }, @@ -245,86 +228,137 @@ where // TODO emit this error, consider this stream as done, anything more to do here? //self.current[i1] = CurVal::Err(e); self.errored = true; - return Ready(Some(Err(e))); + return Ready(Err(e)); } Ready(None) => { self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish; } Pending => { - return Pending; + pending += 1; } - } + }; } - _ => (), } + _ => (), } - let mut lowest_ix = usize::MAX; - let mut lowest_ts = u64::MAX; - for i1 in 0..self.inps.len() { - match &self.current[i1] { - MergedMinMaxAvgScalarStreamCurVal::Finish => {} - MergedMinMaxAvgScalarStreamCurVal::Val(val) => { - let u = self.ixs[i1]; - if u >= val.tss.len() { - self.ixs[i1] = 0; - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; - continue 'outer; - } else { - let ts = val.tss[u]; - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } - } + } + if pending > 0 { + Pending + } else { + Ready(Ok(())) + } + } +} + +impl Stream for MergedMinMaxAvgScalarStream +where + S: Stream> + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("MergedMinMaxAvgScalarStream poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + 'outer: loop { + break if self.data_emit_complete { + error!("MERGER NOTE data_emit_complete"); + if self.range_complete_observed_all { + error!("MERGER NOTE range_complete_observed_all"); + if self.range_complete_observed_all_emitted { + error!("MERGER NOTE range_complete_observed_all_emitted"); + // NOTE everything else (data and stats) must be emitted before data_emit_complete gets set. + self.completed = true; + Ready(None) + } else { + error!("MERGER NOTE range_complete_observed_all EMIT NOW"); + self.range_complete_observed_all_emitted = true; + // NOTE this is supposed to return + Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete))) } - _ => panic!(), - } - } - if lowest_ix == usize::MAX { - if self.batch.tss.len() != 0 { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - if self.range_complete_observed_all { - // TODO we don't want to emit range complete here, instead we want to emit potentially - // a RangeComplete at the very end when data and stats are emitted and all inputs finished. - err::todo(); - } - info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))"); - let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - break Ready(Some(Ok(ret))); } else { - if self.range_complete_observed_all { - // TODO we don't want to emit range complete here, instead we want to emit potentially - // a RangeComplete at the very end when data and stats are emitted and all inputs finished. - err::todo(); - } - info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)"); self.completed = true; - break Ready(None); + Ready(None) } } else { - //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); - assert!(lowest_ts >= self.ts_last_emit); - self.ts_last_emit = lowest_ts; - self.batch.tss.push(lowest_ts); - let rix = self.ixs[lowest_ix]; - let z = match &self.current[lowest_ix] { - MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()), - _ => panic!(), - }; - self.batch.mins.push(z.0); - self.batch.maxs.push(z.1); - self.batch.avgs.push(z.2); - self.ixs[lowest_ix] += 1; - if self.ixs[lowest_ix] >= z.3 { - self.ixs[lowest_ix] = 0; - self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None; + // Can only run logic if all streams are either finished, errored or have some current value. + match self.replenish(cx) { + Ready(Ok(_)) => { + let mut lowest_ix = usize::MAX; + let mut lowest_ts = u64::MAX; + for i1 in 0..self.inps.len() { + match &self.current[i1] { + MergedMinMaxAvgScalarStreamCurVal::Finish => {} + MergedMinMaxAvgScalarStreamCurVal::Val(val) => { + let u = self.ixs[i1]; + if u >= val.tss.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; + continue 'outer; + } else { + let ts = val.tss[u]; + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; + } + } + } + _ => panic!(), + } + } + if lowest_ix == usize::MAX { + if self.batch.tss.len() != 0 { + let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))"); + let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + self.data_emit_complete = true; + Ready(Some(Ok(ret))) + } else { + info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)"); + self.data_emit_complete = true; + continue 'outer; + } + } else { + //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); + assert!(lowest_ts >= self.ts_last_emit); + self.ts_last_emit = lowest_ts; + self.batch.tss.push(lowest_ts); + let rix = self.ixs[lowest_ix]; + let z = match &self.current[lowest_ix] { + MergedMinMaxAvgScalarStreamCurVal::Val(k) => { + (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()) + } + _ => panic!(), + }; + self.batch.mins.push(z.0); + self.batch.maxs.push(z.1); + self.batch.avgs.push(z.2); + self.ixs[lowest_ix] += 1; + if self.ixs[lowest_ix] >= z.3 { + self.ixs[lowest_ix] = 0; + self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None; + } + if self.batch.tss.len() >= self.batch_size { + let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); + let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); + Ready(Some(Ok(ret))) + } else { + continue 'outer; + } + } + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, } - } - if self.batch.tss.len() >= self.batch_size { - let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); - return Ready(Some(Ok(ret))); - } + }; } } }