Towards filling in more missing pieces

This commit is contained in:
Dominik Werder
2021-05-04 18:24:30 +02:00
parent bf08893a98
commit be36dcce89
6 changed files with 73 additions and 66 deletions

View File

@@ -4,7 +4,7 @@ Aggregation and binning support.
use super::eventchunker::EventFull;
use crate::agg::eventbatch::{MinMaxAvgScalarEventBatch, MinMaxAvgScalarEventBatchAggregator};
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchAggregator};
use crate::eventchunker::EventChunkerItem;
use err::Error;
use futures_core::Stream;
@@ -448,7 +448,7 @@ where
Ready(Some(Err(e)))
}
},
EventChunkerItem::RangeComplete => err::todoval(),
EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))),
EventChunkerItem::EventDataReadStats(_stats) => {
// TODO ret.event_data_read_stats.trans(&mut k.event_data_read_stats);
// TODO ret.values_extract_stats.dur += inst2.duration_since(inst1);
@@ -506,7 +506,7 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem {
}
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum MinMaxAvgScalarBinBatchStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
@@ -519,7 +519,7 @@ pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
}
impl MinMaxAvgScalarEventBatchStreamItemAggregator {
pub fn new2(ts1: u64, ts2: u64) -> Self {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarEventBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
@@ -581,7 +581,7 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
//<Self as AggregatableTdim>::Aggregator::new(ts1, ts2)
Self::Aggregator::new2(ts1, ts2)
Self::Aggregator::new(ts1, ts2)
}
}
@@ -593,30 +593,65 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
}
}
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {}
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
agg: MinMaxAvgScalarBinBatchAggregator,
event_data_read_stats: EventDataReadStats,
}
impl MinMaxAvgScalarBinBatchStreamItemAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
let agg = <MinMaxAvgScalarBinBatch as AggregatableTdim>::aggregator_new_static(ts1, ts2);
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
}
}
}
impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
type InputValue = MinMaxAvgScalarBinBatchStreamItem;
type OutputValue = MinMaxAvgScalarBinBatchStreamItem;
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => todo!(),
}
}
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => todo!(),
}
}
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => todo!(),
}
}
fn ingest(&mut self, _inp: &mut Self::InputValue) {
todo!()
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(),
}
}
fn result(self) -> Vec<Self::OutputValue> {
todo!()
let mut ret: Vec<Self::OutputValue> = self
.agg
.result()
.into_iter()
.map(MinMaxAvgScalarBinBatchStreamItem::Values)
.collect();
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
ret
}
}
@@ -624,8 +659,8 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
type Output = MinMaxAvgScalarBinBatchStreamItem;
type Aggregator = MinMaxAvgScalarBinBatchStreamItemAggregator;
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2)
}
}

View File

@@ -152,9 +152,6 @@ pub struct MinMaxAvgScalarEventBatchAggregator {
min: f32,
max: f32,
sum: f32,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
range_complete_observed: bool,
}
impl MinMaxAvgScalarEventBatchAggregator {
@@ -166,9 +163,6 @@ impl MinMaxAvgScalarEventBatchAggregator {
max: f32::MIN,
sum: 0f32,
count: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
}
@@ -209,11 +203,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
v.tss.last().map(|k| k / SEC),
);
}
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
if v.range_complete_observed {
self.range_complete_observed = true;
}
for i1 in 0..v.tss.len() {
let ts = v.tss[i1];
if ts < self.ts1 {
@@ -250,7 +239,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
}
}
fn result(mut self) -> Vec<Self::OutputValue> {
fn result(self) -> Vec<Self::OutputValue> {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
@@ -265,9 +254,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
};
vec![v]
}

View File

@@ -1,8 +1,8 @@
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, ValuesExtractStats};
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside};
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{EventDataReadStats, NanoRange};
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
@@ -15,9 +15,6 @@ pub struct MinMaxAvgScalarBinBatch {
pub mins: Vec<f32>,
pub maxs: Vec<f32>,
pub avgs: Vec<f32>,
pub event_data_read_stats: EventDataReadStats,
pub values_extract_stats: ValuesExtractStats,
pub range_complete_observed: bool,
}
impl MinMaxAvgScalarBinBatch {
@@ -29,9 +26,6 @@ impl MinMaxAvgScalarBinBatch {
mins: vec![],
maxs: vec![],
avgs: vec![],
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
@@ -120,15 +114,12 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?} EDS {:?} VXS {:?} COMP {}",
"MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}",
self.ts1s.len(),
self.ts1s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.ts2s.iter().map(|k| k / SEC).collect::<Vec<_>>(),
self.counts,
self.avgs,
self.event_data_read_stats,
self.values_extract_stats,
self.range_complete_observed,
)
}
}
@@ -210,9 +201,6 @@ pub struct MinMaxAvgScalarBinBatchAggregator {
max: f32,
sum: f32,
sumc: u64,
event_data_read_stats: EventDataReadStats,
values_extract_stats: ValuesExtractStats,
range_complete_observed: bool,
}
impl MinMaxAvgScalarBinBatchAggregator {
@@ -225,9 +213,6 @@ impl MinMaxAvgScalarBinBatchAggregator {
max: f32::MIN,
sum: 0f32,
sumc: 0,
event_data_read_stats: EventDataReadStats::new(),
values_extract_stats: ValuesExtractStats::new(),
range_complete_observed: false,
}
}
}
@@ -258,11 +243,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
}
fn ingest(&mut self, v: &mut Self::InputValue) {
self.event_data_read_stats.trans(&mut v.event_data_read_stats);
self.values_extract_stats.trans(&mut v.values_extract_stats);
if v.range_complete_observed {
self.range_complete_observed = true;
}
for i1 in 0..v.ts1s.len() {
let ts1 = v.ts1s[i1];
let ts2 = v.ts2s[i1];
@@ -280,7 +260,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
}
}
fn result(mut self) -> Vec<Self::OutputValue> {
fn result(self) -> Vec<Self::OutputValue> {
let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.sumc == 0 {
@@ -295,9 +275,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
event_data_read_stats: std::mem::replace(&mut self.event_data_read_stats, EventDataReadStats::new()),
values_extract_stats: std::mem::replace(&mut self.values_extract_stats, ValuesExtractStats::new()),
range_complete_observed: self.range_complete_observed,
};
vec![v]
}

View File

@@ -1,4 +1,4 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::MinMaxAvgScalarBinBatchStreamItem;
use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{CacheUsage, PreBinnedQuery};
use err::Error;
@@ -12,7 +12,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
pub struct BinnedStream {
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatch, Error>> + Send>>,
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
}
impl BinnedStream {
@@ -57,10 +57,14 @@ impl BinnedStream {
Fits::Inside
| Fits::PartlyGreater
| Fits::PartlyLower
| Fits::PartlyLowerAndGreater => Some(Ok(k)),
| Fits::PartlyLowerAndGreater => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k))),
_ => None,
}
}
Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)),
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
}
Err(e) => {
error!("observe error in stream {:?}", e);
Some(Err(e))
@@ -69,7 +73,7 @@ impl BinnedStream {
ready(g)
}
})
.map(|k| k)
//.map(|k| k)
.into_binned_t(range);
Self { inp: Box::pin(inp) }
}
@@ -77,7 +81,7 @@ impl BinnedStream {
impl Stream for BinnedStream {
// TODO make this generic over all possible things
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inp.poll_next_unpin(cx)

View File

@@ -7,7 +7,7 @@ use futures_core::Stream;
use futures_util::{pin_mut, FutureExt};
#[allow(unused_imports)]
use netpod::log::*;
use netpod::NodeConfigCached;
use netpod::{EventDataReadStats, NodeConfigCached};
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -47,6 +47,9 @@ impl PreBinnedValueFetchedStream {
#[derive(Serialize, Deserialize)]
pub enum PreBinnedItem {
Batch(MinMaxAvgScalarBinBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
//ValuesExtractStats(ValuesExtractStats),
}
impl Stream for PreBinnedValueFetchedStream {

View File

@@ -1,6 +1,7 @@
use crate::spawn_test_hosts;
use bytes::BytesMut;
use chrono::{DateTime, Utc};
use disk::agg::MinMaxAvgScalarBinBatchStreamItem;
use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error;
use futures_util::StreamExt;
@@ -162,10 +163,11 @@ where
.fold(Ok(BinnedResponse::new()), |a, k| {
let g = match a {
Ok(mut a) => match k {
Ok(k) => {
Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => {
a.bin_count += k.ts1s.len();
Ok(a)
}
Ok(_) => Ok(a),
Err(e) => Err(e),
},
Err(e) => Err(e),