diff --git a/disk/src/agg.rs b/disk/src/agg.rs index f9baea8..592abf8 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -4,7 +4,7 @@ Aggregation and binning support. use super::eventchunker::EventFull; use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchAggregator}; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator}; use crate::eventchunker::EventChunkerItem; use err::Error; use futures_core::Stream; @@ -448,7 +448,7 @@ where Ready(Some(Err(e))) } }, - EventChunkerItem::RangeComplete => err::todoval(), + EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))), EventChunkerItem::EventDataReadStats(_stats) => { // TODO ret.event_data_read_stats.trans(&mut k.event_data_read_stats); // TODO ret.values_extract_stats.dur += inst2.duration_since(inst1); @@ -506,7 +506,7 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum MinMaxAvgScalarBinBatchStreamItem { Values(MinMaxAvgScalarBinBatch), RangeComplete, @@ -519,7 +519,7 @@ pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { } impl MinMaxAvgScalarEventBatchStreamItemAggregator { - pub fn new2(ts1: u64, ts2: u64) -> Self { + pub fn new(ts1: u64, ts2: u64) -> Self { let agg = ::aggregator_new_static(ts1, ts2); Self { agg, @@ -581,7 +581,7 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { //::Aggregator::new(ts1, ts2) - Self::Aggregator::new2(ts1, ts2) + Self::Aggregator::new(ts1, ts2) } } @@ -593,30 +593,65 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem { } } -pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {} +pub struct MinMaxAvgScalarBinBatchStreamItemAggregator { + agg: MinMaxAvgScalarBinBatchAggregator, + event_data_read_stats: EventDataReadStats, +} + +impl MinMaxAvgScalarBinBatchStreamItemAggregator { + pub fn new(ts1: u64, ts2: u64) -> Self { + let agg = ::aggregator_new_static(ts1, ts2); + Self { + agg, + event_data_read_stats: EventDataReadStats::new(), + } + } +} impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { type InputValue = MinMaxAvgScalarBinBatchStreamItem; type OutputValue = MinMaxAvgScalarBinBatchStreamItem; - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_before(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals), + _ => todo!(), + } } - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_after(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals), + _ => todo!(), + } } - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn starts_after(&self, inp: &Self::InputValue) -> bool { + match inp { + MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals), + _ => todo!(), + } } - fn ingest(&mut self, _inp: &mut Self::InputValue) { - todo!() + fn ingest(&mut self, inp: &mut Self::InputValue) { + match inp { + MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), + MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats), + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(), + } } fn result(self) -> Vec { - todo!() + let mut ret: Vec = self + .agg + .result() + .into_iter() + .map(MinMaxAvgScalarBinBatchStreamItem::Values) + .collect(); + ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( + self.event_data_read_stats, + )); + ret } } @@ -624,8 +659,8 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { type Output = MinMaxAvgScalarBinBatchStreamItem; type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator; - fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + Self::Aggregator::new(ts1, ts2) } } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index d5b2c7d..cb3d069 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -152,9 +152,6 @@ pub struct MinMaxAvgScalarEventBatchAggregator { min: f32, max: f32, sum: f32, - event_data_read_stats: EventDataReadStats, - values_extract_stats: ValuesExtractStats, - range_complete_observed: bool, } impl MinMaxAvgScalarEventBatchAggregator { @@ -166,9 +163,6 @@ impl MinMaxAvgScalarEventBatchAggregator { max: f32::MIN, sum: 0f32, count: 0, - event_data_read_stats: EventDataReadStats::new(), - values_extract_stats: ValuesExtractStats::new(), - range_complete_observed: false, } } } @@ -209,11 +203,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { v.tss.last().map(|k| k / SEC), ); } - self.event_data_read_stats.trans(&mut v.event_data_read_stats); - self.values_extract_stats.trans(&mut v.values_extract_stats); - if v.range_complete_observed { - self.range_complete_observed = true; - } for i1 in 0..v.tss.len() { let ts = v.tss[i1]; if ts < self.ts1 { @@ -250,7 +239,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } } - fn result(mut self) -> Vec { + fn result(self) -> Vec { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.count == 0 { @@ -265,9 +254,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { mins: vec![min], maxs: vec![max], avgs: vec![avg], - event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), - values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), - range_complete_observed: self.range_complete_observed, }; vec![v] } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 5db9356..3f09407 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,8 +1,8 @@ -use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, ValuesExtractStats}; +use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside}; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{EventDataReadStats, NanoRange}; +use netpod::NanoRange; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -15,9 +15,6 @@ pub struct MinMaxAvgScalarBinBatch { pub mins: Vec, pub maxs: Vec, pub avgs: Vec, - pub event_data_read_stats: EventDataReadStats, - pub values_extract_stats: ValuesExtractStats, - pub range_complete_observed: bool, } impl MinMaxAvgScalarBinBatch { @@ -29,9 +26,6 @@ impl MinMaxAvgScalarBinBatch { mins: vec![], maxs: vec![], avgs: vec![], - event_data_read_stats: EventDataReadStats::new(), - values_extract_stats: ValuesExtractStats::new(), - range_complete_observed: false, } } @@ -120,15 +114,12 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, - "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?} COMP {}", + "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", self.ts1s.len(), self.ts1s.iter().map(|k| k / SEC).collect::>(), self.ts2s.iter().map(|k| k / SEC).collect::>(), self.counts, self.avgs, - self.event_data_read_stats, - self.values_extract_stats, - self.range_complete_observed, ) } } @@ -210,9 +201,6 @@ pub struct MinMaxAvgScalarBinBatchAggregator { max: f32, sum: f32, sumc: u64, - event_data_read_stats: EventDataReadStats, - values_extract_stats: ValuesExtractStats, - range_complete_observed: bool, } impl MinMaxAvgScalarBinBatchAggregator { @@ -225,9 +213,6 @@ impl MinMaxAvgScalarBinBatchAggregator { max: f32::MIN, sum: 0f32, sumc: 0, - event_data_read_stats: EventDataReadStats::new(), - values_extract_stats: ValuesExtractStats::new(), - range_complete_observed: false, } } } @@ -258,11 +243,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } fn ingest(&mut self, v: &mut Self::InputValue) { - self.event_data_read_stats.trans(&mut v.event_data_read_stats); - self.values_extract_stats.trans(&mut v.values_extract_stats); - if v.range_complete_observed { - self.range_complete_observed = true; - } for i1 in 0..v.ts1s.len() { let ts1 = v.ts1s[i1]; let ts2 = v.ts2s[i1]; @@ -280,7 +260,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { } } - fn result(mut self) -> Vec { + fn result(self) -> Vec { let min = if self.min == f32::MAX { f32::NAN } else { self.min }; let max = if self.max == f32::MIN { f32::NAN } else { self.max }; let avg = if self.sumc == 0 { @@ -295,9 +275,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { mins: vec![min], maxs: vec![max], avgs: vec![avg], - event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()), - values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()), - range_complete_observed: self.range_complete_observed, }; vec![v] } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 05f606b..139f65f 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,4 +1,4 @@ -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; +use crate::agg::MinMaxAvgScalarBinBatchStreamItem; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; @@ -12,7 +12,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; pub struct BinnedStream { - inp: Pin> + Send>>, + inp: Pin> + Send>>, } impl BinnedStream { @@ -57,10 +57,14 @@ impl BinnedStream { Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower - | Fits::PartlyLowerAndGreater => Some(Ok(k)), + | Fits::PartlyLowerAndGreater => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k))), _ => None, } } + Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), + Ok(PreBinnedItem::EventDataReadStats(stats)) => { + Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) + } Err(e) => { error!("observe error in stream {:?}", e); Some(Err(e)) @@ -69,7 +73,7 @@ impl BinnedStream { ready(g) } }) - .map(|k| k) + //.map(|k| k) .into_binned_t(range); Self { inp: Box::pin(inp) } } @@ -77,7 +81,7 @@ impl BinnedStream { impl Stream for BinnedStream { // TODO make this generic over all possible things - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.inp.poll_next_unpin(cx) diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 825f91f..ebf24e7 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -7,7 +7,7 @@ use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; #[allow(unused_imports)] use netpod::log::*; -use netpod::NodeConfigCached; +use netpod::{EventDataReadStats, NodeConfigCached}; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -47,6 +47,9 @@ impl PreBinnedValueFetchedStream { #[derive(Serialize, Deserialize)] pub enum PreBinnedItem { Batch(MinMaxAvgScalarBinBatch), + RangeComplete, + EventDataReadStats(EventDataReadStats), + //ValuesExtractStats(ValuesExtractStats), } impl Stream for PreBinnedValueFetchedStream { diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index f28bd6c..114baaa 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,6 +1,7 @@ use crate::spawn_test_hosts; use bytes::BytesMut; use chrono::{DateTime, Utc}; +use disk::agg::MinMaxAvgScalarBinBatchStreamItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::StreamExt; @@ -162,10 +163,11 @@ where .fold(Ok(BinnedResponse::new()), |a, k| { let g = match a { Ok(mut a) => match k { - Ok(k) => { + Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => { a.bin_count += k.ts1s.len(); Ok(a) } + Ok(_) => Ok(a), Err(e) => Err(e), }, Err(e) => Err(e),