From e312db7ac2d48a3a102dae2b1fbb87022667df26 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 26 May 2021 15:20:28 +0200 Subject: [PATCH] More concrete xbinned to tbinned aggregator --- disk/src/agg/binnedt3.rs | 185 ++++++++++++++++++++++++++++++++++--- disk/src/agg/eventbatch.rs | 26 +++++- disk/src/binned.rs | 9 +- disk/src/cache/pbv.rs | 5 +- 4 files changed, 205 insertions(+), 20 deletions(-) diff --git a/disk/src/agg/binnedt3.rs b/disk/src/agg/binnedt3.rs index 83a0e1d..d1690d9 100644 --- a/disk/src/agg/binnedt3.rs +++ b/disk/src/agg/binnedt3.rs @@ -1,10 +1,12 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; -use crate::binned::{BinnedStreamKind, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, RangeCompletableItem, RangeOverlapInfo}; use err::Error; use futures_core::Stream; -use netpod::BinnedRange; +use futures_util::StreamExt; +use netpod::log::*; +use netpod::{BinnedRange, NanoRange}; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -14,7 +16,71 @@ pub trait Aggregator3Tdim { type OutputValue; } -pub struct Agg3 {} +pub struct Agg3 { + range: NanoRange, + count: u64, + min: f32, + max: f32, + sum: f32, + sumc: u64, +} + +impl Agg3 { + fn new(range: NanoRange) -> Self { + Self { + range, + count: 0, + min: f32::MAX, + max: f32::MIN, + sum: f32::NAN, + sumc: 0, + } + } + + fn ingest(&mut self, item: &mut MinMaxAvgScalarEventBatch) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + if ts < self.range.beg { + continue; + } else if ts >= self.range.end { + continue; + } else { + self.count += 1; + self.min = self.min.min(item.mins[i1]); + self.max = self.max.max(item.maxs[i1]); + let x = item.avgs[i1]; + if x.is_nan() { + } else { + if self.sum.is_nan() { + self.sum = x; + } else { + self.sum += x; + } + self.sumc += 1; + } + } + } + } + + fn result(self) -> Vec { + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.sumc == 0 { + f32::NAN + } else { + self.sum / self.sumc as f32 + }; + let v = MinMaxAvgScalarBinBatch { + ts1s: vec![self.range.beg], + ts2s: vec![self.range.end], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + }; + vec![v] + } +} impl Aggregator3Tdim for Agg3 { type InputValue = MinMaxAvgScalarEventBatch; @@ -23,17 +89,16 @@ impl Aggregator3Tdim for Agg3 { pub struct BinnedT3Stream { // TODO get rid of box: - inp: Pin + Send>>, + inp: Pin>, Error>> + Send>>, //aggtor: Option<<::XBinnedEvents as AggregatableTdim>::Aggregator>, - aggtor: Option<()>, + aggtor: Option, spec: BinnedRange, curbin: u32, inp_completed: bool, all_bins_emitted: bool, range_complete_observed: bool, range_complete_emitted: bool, - //left: Option::XBinnedEvents>>, Error>>>>, - left: Option<()>, + left: Option>, Error>>>>, errored: bool, completed: bool, tmp_agg_results: VecDeque, @@ -42,13 +107,12 @@ pub struct BinnedT3Stream { impl BinnedT3Stream { pub fn new(inp: S, spec: BinnedRange) -> Self where - S: Stream + Send + 'static, + S: Stream>, Error>> + Send + 'static, { - // TODO simplify here, get rid of numeric parameter: let range = spec.get_range(0); Self { inp: Box::pin(inp), - aggtor: None, + aggtor: Some(Agg3::new(range)), spec, curbin: 0, inp_completed: false, @@ -61,6 +125,100 @@ impl BinnedT3Stream { tmp_agg_results: VecDeque::new(), } } + + fn cur( + &mut self, + cx: &mut Context, + ) -> Poll>, Error>>> { + if let Some(cur) = self.left.take() { + cur + } else if self.inp_completed { + Poll::Ready(None) + } else { + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + } + } + + fn cycle_current_bin(&mut self) { + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(Agg3::new(range)) + // TODO handle None case, or remove Option if Agg is always present + .unwrap() + .result(); + self.tmp_agg_results = VecDeque::from(ret); + if self.curbin >= self.spec.count as u32 { + self.all_bins_emitted = true; + } + } + + fn handle( + &mut self, + cur: Poll>, Error>>>, + ) -> Option>, Error>>>> { + use Poll::*; + match cur { + Ready(Some(Ok(item))) => match item { + StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))), + StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))), + StreamItem::DataItem(item) => match item { + RangeCompletableItem::RangeComplete => { + self.range_complete_observed = true; + None + } + RangeCompletableItem::Data(item) => { + if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. + None + } else { + let ag = self.aggtor.as_mut().unwrap(); + if item.ends_before(ag.range.clone()) { + None + } else if item.starts_after(ag.range.clone()) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut item = item; + err::todo(); + // TODO ingest the data into Agg3 + let item = item; + //if ag.ends_after(&item) { + if item.ends_after(ag.range.clone()) { + self.left = + Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + } + }, + }, + Ready(Some(Err(e))) => { + self.errored = true; + Some(Ready(Some(Err(e)))) + } + Ready(None) => { + self.inp_completed = true; + if self.all_bins_emitted { + None + } else { + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + Pending => Some(Pending), + } + } } impl Stream for BinnedT3Stream { @@ -87,14 +245,11 @@ impl Stream for BinnedT3Stream { continue 'outer; } } else { - err::todo(); - Pending - // TODO `cur` and `handle` are not yet taken over from binnedt.rs - /*let cur = self.cur(cx); + let cur = self.cur(cx); match self.handle(cur) { Some(item) => item, None => continue 'outer, - }*/ + } }; } } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 779ff67..e4ad482 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -2,11 +2,12 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::StreamItem; use crate::agg::AggregatableXdim1Bin; -use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem}; +use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo}; use crate::frame::makeframe::make_frame; use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use netpod::log::*; +use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -255,3 +256,26 @@ impl MakeBytesFrame for Result bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 38b903e..468a057 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -480,6 +480,12 @@ impl Collectable for MinMaxAvgScalarBinBatch { } } +pub trait RangeOverlapInfo { + fn ends_before(&self, range: NanoRange) -> bool; + fn ends_after(&self, range: NanoRange) -> bool; + fn starts_after(&self, range: NanoRange) -> bool; +} + pub trait XBinnedEvents: Sized + Unpin @@ -539,7 +545,8 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static type XBinnedEvents: XBinnedEvents; type TBinnedBins: TBinnedBins; type XBinnedToTBinnedAggregator; - type XBinnedToTBinnedStream; + type XBinnedToTBinnedStream: Stream>, Error>> + + Send; fn new_binned_from_prebinned( &self, diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index a5463d2..7f3aeab 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -167,9 +167,8 @@ where self.node_config.node_config.cluster.clone(), self.stream_kind.clone(), ); - let s1 = crate::agg::binnedt::IntoBinnedT::::into_binned_t(s1, range); - //self.fut2 = Some(Box::pin(s1)); - self.fut2 = err::todoval(); + let s1 = ::xbinned_to_tbinned(s1, range); + self.fut2 = Some(Box::pin(s1)); } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {