diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f343954..f9baea8 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -3,7 +3,7 @@ Aggregation and binning support. */ use super::eventchunker::EventFull; -use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; +use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchAggregator}; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::eventchunker::EventChunkerItem; use err::Error; @@ -25,12 +25,12 @@ pub mod scalarbinbatch; pub trait AggregatorTdim { type InputValue; - type OutputValue: AggregatableXdim1Bin + AggregatableTdim; + type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool; fn ingest(&mut self, inp: &mut Self::InputValue); - fn result(self) -> Self::OutputValue; + fn result(self) -> Vec; } pub trait AggregatableXdim1Bin { @@ -499,7 +499,10 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem { type Output = MinMaxAvgScalarEventBatchStreamItem; fn into_agg(self) -> Self::Output { - todo!() + match self { + Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()), + _ => panic!(), + } } } @@ -510,30 +513,65 @@ pub enum MinMaxAvgScalarBinBatchStreamItem { EventDataReadStats(EventDataReadStats), } -pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {} +pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { + agg: MinMaxAvgScalarEventBatchAggregator, + event_data_read_stats: EventDataReadStats, +} + +impl MinMaxAvgScalarEventBatchStreamItemAggregator { + pub fn new2(ts1: u64, ts2: u64) -> Self { + let agg = ::aggregator_new_static(ts1, ts2); + Self { + agg, + event_data_read_stats: EventDataReadStats::new(), + } + } +} impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { type InputValue = MinMaxAvgScalarEventBatchStreamItem; type OutputValue = MinMaxAvgScalarBinBatchStreamItem; - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_before(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals), + _ => todo!(), + } } - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_after(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals), + _ => todo!(), + } } - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn starts_after(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals), + _ => todo!(), + } } - fn ingest(&mut self, _inp: &mut Self::InputValue) { - todo!() + fn ingest(&mut self, inp: &mut Self::InputValue) { + match inp { + MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(), + } } - fn result(self) -> Self::OutputValue { - todo!() + fn result(self) -> Vec { + let mut ret: Vec = self + .agg + .result() + .into_iter() + .map(MinMaxAvgScalarBinBatchStreamItem::Values) + .collect(); + ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( + self.event_data_read_stats, + )); + ret } } @@ -541,8 +579,9 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { type Output = MinMaxAvgScalarBinBatchStreamItem; type Aggregator = MinMaxAvgScalarEventBatchStreamItemAggregator; - fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + //::Aggregator::new(ts1, ts2) + Self::Aggregator::new2(ts1, ts2) } } @@ -576,7 +615,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { todo!() } - fn result(self) -> Self::OutputValue { + fn result(self) -> Vec { todo!() } } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 1582a57..0d52c04 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -4,6 +4,7 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; use netpod::BinnedRange; +use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -38,6 +39,7 @@ where errored: bool, completed: bool, inp_completed: bool, + tmp_agg_results: VecDeque<::OutputValue>, } impl IntoBinnedTDefaultStream @@ -56,14 +58,15 @@ where errored: false, completed: false, inp_completed: false, + tmp_agg_results: VecDeque::new(), } } } -impl Stream for IntoBinnedTDefaultStream +impl Stream for IntoBinnedTDefaultStream where I: AggregatableTdim + Unpin, - T: Stream> + Unpin, + S: Stream> + Unpin, I::Aggregator: Unpin, { type Item = Result<::OutputValue, Error>; @@ -78,6 +81,9 @@ where return Ready(None); } 'outer: loop { + if let Some(item) = self.tmp_agg_results.pop_front() { + return Ready(Some(Ok(item))); + } let cur = if let Some(k) = self.left.take() { k } else if self.inp_completed { @@ -102,7 +108,9 @@ where .replace(I::aggregator_new_static(range.beg, range.end)) .unwrap() .result(); - Ready(Some(Ok(ret))) + //Ready(Some(Ok(ret))) + self.tmp_agg_results = ret.into(); + continue 'outer; } else { //info!("INGEST"); let mut k = k; @@ -119,7 +127,9 @@ where .replace(I::aggregator_new_static(range.beg, range.end)) .unwrap() .result(); - Ready(Some(Ok(ret))) + //Ready(Some(Ok(ret))) + self.tmp_agg_results = ret.into(); + continue 'outer; } else { //info!("ENDS WITHIN"); continue 'outer; @@ -140,7 +150,12 @@ where self.curbin += 1; let range = self.spec.get_range(self.curbin); match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) { - Some(ag) => Ready(Some(Ok(ag.result()))), + Some(ag) => { + let ret = ag.result(); + //Ready(Some(Ok(ag.result()))) + self.tmp_agg_results = ret.into(); + continue 'outer; + } None => { panic!(); } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 91ceebc..d5b2c7d 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -250,7 +250,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } } - fn result(mut self) -> Self::OutputValue { + fn result(mut self) -> Vec { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.count == 0 { @@ -258,7 +258,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } else { self.sum / self.count as f32 }; - MinMaxAvgScalarBinBatch { + let v = MinMaxAvgScalarBinBatch { ts1s: vec![self.ts1], ts2s: vec![self.ts2], counts: vec![self.count], @@ -268,6 +268,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), range_complete_observed: self.range_complete_observed, - } + }; + vec![v] } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 4e832d9..5db9356 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -280,7 +280,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } } - fn result(mut self) -> Self::OutputValue { + fn result(mut self) -> Vec { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.sumc == 0 { @@ -288,7 +288,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } else { self.sum / self.sumc as f32 }; - MinMaxAvgScalarBinBatch { + let v = MinMaxAvgScalarBinBatch { ts1s: vec![self.ts1], ts2s: vec![self.ts2], counts: vec![self.count], @@ -298,6 +298,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), range_complete_observed: self.range_complete_observed, - } + }; + vec![v] } } diff --git a/err/src/lib.rs b/err/src/lib.rs index e07cf70..183f9e6 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -210,6 +210,10 @@ impl From for Error { } } +pub fn todo() { + todo!("TODO"); +} + pub fn todoval() -> T { todo!("TODO todoval") }