From 9e9f33e086d915d0cbf03e0e4beaa1d431e6ba20 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 24 Sep 2024 14:08:24 +0200 Subject: [PATCH] WIP --- crates/items_2/src/binning.rs | 2 + crates/items_2/src/binning/aggregator.rs | 65 ++++-- crates/items_2/src/binning/binnedvaluetype.rs | 8 + crates/items_2/src/binning/container_bins.rs | 191 ++++++++++++++++++ .../items_2/src/binning/container_events.rs | 31 ++- crates/items_2/src/binning/test/events00.rs | 3 +- .../binning/timeweight/timeweight_events.rs | 114 +++++++---- crates/items_2/src/binning/valuetype.rs | 31 ++- 8 files changed, 379 insertions(+), 66 deletions(-) create mode 100644 crates/items_2/src/binning/binnedvaluetype.rs create mode 100644 crates/items_2/src/binning/container_bins.rs diff --git a/crates/items_2/src/binning.rs b/crates/items_2/src/binning.rs index 8924855..f480ad1 100644 --- a/crates/items_2/src/binning.rs +++ b/crates/items_2/src/binning.rs @@ -1,4 +1,6 @@ pub mod aggregator; +pub mod binnedvaluetype; +pub mod container_bins; pub mod container_events; pub mod timeweight; pub mod valuetype; diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index c85cf3c..18993af 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -1,3 +1,5 @@ +use super::binnedvaluetype::BinnedNumericValue; +use super::binnedvaluetype::BinnedValueType; use super::container_events::EventValueType; use netpod::DtNano; @@ -5,9 +7,12 @@ pub trait AggregatorTimeWeight where EVT: EventValueType, { + type OutputAvg; + fn new() -> Self; - fn reset_for_new_bin(&mut self); fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT); + fn reset_for_new_bin(&mut self); + fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg; } pub struct AggregatorNumeric { @@ -22,49 +27,77 @@ impl AggregatorTimeWeight for AggregatorNumeric where EVT: AggWithSame, { - fn new() -> Self { - todo!() - } + type OutputAvg = EVT; - fn reset_for_new_bin(&mut self) { - todo!() + fn new() -> Self { + Self { + sum: EVT::identity_sum(), + } } fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) { - todo!() + let f = dt.ns() as f32 / bl.ns() as f32; + eprintln!("INGEST {} {:?}", f, val); + self.sum.add_weighted(&val, f); + } + + fn reset_for_new_bin(&mut self) { + self.sum = EVT::identity_sum(); + } + + fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + let ret = self.sum.clone(); + self.sum = EVT::identity_sum(); + ret } } impl AggregatorTimeWeight for AggregatorNumeric { + type OutputAvg = f32; + fn new() -> Self { Self { - sum: f32::sum_identity(), + sum: f32::identity_sum(), } } - fn reset_for_new_bin(&mut self) { - self.sum = f32::sum_identity(); - } - fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) { let f = dt.ns() as f32 / bl.ns() as f32; eprintln!("INGEST {} {}", f, val); self.sum += f * val; } + + fn reset_for_new_bin(&mut self) { + self.sum = f32::identity_sum(); + } + + fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + let ret = self.sum.clone(); + self.sum = f32::identity_sum(); + ret + } } impl AggregatorTimeWeight for AggregatorNumeric { - fn new() -> Self { - todo!() - } + type OutputAvg = u64; - fn reset_for_new_bin(&mut self) { + fn new() -> Self { todo!() } fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) { todo!() } + + fn reset_for_new_bin(&mut self) { + self.sum = u64::identity_sum(); + } + + fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + let ret = self.sum.clone(); + self.sum = u64::identity_sum(); + ret + } } // TODO do enum right from begin, using a SOA enum container. diff --git a/crates/items_2/src/binning/binnedvaluetype.rs b/crates/items_2/src/binning/binnedvaluetype.rs new file mode 100644 index 0000000..306fed4 --- /dev/null +++ b/crates/items_2/src/binning/binnedvaluetype.rs @@ -0,0 +1,8 @@ +pub trait BinnedValueType {} + +pub struct BinnedNumericValue { + avg: f32, + _t: Option, +} + +impl BinnedValueType for BinnedNumericValue {} diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs new file mode 100644 index 0000000..4cd0c2e --- /dev/null +++ b/crates/items_2/src/binning/container_bins.rs @@ -0,0 +1,191 @@ +use super::aggregator::AggregatorNumeric; +use super::aggregator::AggregatorTimeWeight; +use super::container_events::EventValueType; +use super::___; +use crate::vecpreview::PreviewRange; +use crate::vecpreview::VecPreview; +use core::fmt; +use err::thiserror; +use err::ThisError; +use netpod::TsNano; +use serde::Deserialize; +use serde::Serialize; +use std::any; +use std::collections::VecDeque; + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[derive(Debug, ThisError)] +#[cstm(name = "ContainerBins")] +pub enum ContainerBinsError { + Unordered, +} + +pub trait BinValueType: fmt::Debug + Clone + PartialOrd { + // type Container: Container; + // type AggregatorTimeWeight: AggregatorTimeWeight; + // type AggTimeWeightOutputAvg; + + // fn identity_sum() -> Self; + // fn add_weighted(&self, add: &Self, f: f32) -> Self; +} + +#[derive(Debug, Clone)] +pub struct BinSingle { + pub ts1: TsNano, + pub ts2: TsNano, + pub cnt: u64, + pub min: EVT, + pub max: EVT, + pub avg: f32, + pub lst: EVT, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ContainerBins +where + EVT: EventValueType, +{ + ts1s: VecDeque, + ts2s: VecDeque, + cnts: VecDeque, + mins: VecDeque, + maxs: VecDeque, + avgs: VecDeque, + lsts: VecDeque, +} + +impl ContainerBins +where + EVT: EventValueType, +{ + pub fn type_name() -> &'static str { + any::type_name::() + } + + pub fn new() -> Self { + Self { + ts1s: VecDeque::new(), + ts2s: VecDeque::new(), + cnts: VecDeque::new(), + mins: VecDeque::new(), + maxs: VecDeque::new(), + avgs: VecDeque::new(), + lsts: VecDeque::new(), + } + } + + pub fn len(&self) -> usize { + self.ts1s.len() + } + + pub fn verify(&self) -> Result<(), ContainerBinsError> { + if self.ts1s.iter().zip(self.ts1s.iter().skip(1)).any(|(&a, &b)| a > b) { + return Err(ContainerBinsError::Unordered); + } + if self.ts2s.iter().zip(self.ts2s.iter().skip(1)).any(|(&a, &b)| a > b) { + return Err(ContainerBinsError::Unordered); + } + Ok(()) + } + + pub fn ts1_first(&self) -> Option { + self.ts1s.front().map(|&x| x) + } + + pub fn ts2_last(&self) -> Option { + self.ts2s.back().map(|&x| x) + } + + pub fn len_before(&self, end: TsNano) -> usize { + let pp = self.ts2s.partition_point(|&x| x <= end); + assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len()); + pp + } + + pub fn pop_front(&mut self) -> Option> { + let ts1 = if let Some(x) = self.ts1s.pop_front() { + x + } else { + return None; + }; + let ts2 = if let Some(x) = self.ts2s.pop_front() { + x + } else { + return None; + }; + todo!() + } + + // pub fn push_back(&mut self, ts1: TsNano, val: EVT) { + // self.tss.push_back(ts); + // self.vals.push_back(val); + // } +} + +impl fmt::Debug for ContainerBins +where + EVT: EventValueType, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let self_name = any::type_name::(); + write!( + fmt, + "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?} }}", + self.len(), + VecPreview::new(&self.ts1s), + VecPreview::new(&self.ts2s), + VecPreview::new(&self.cnts), + VecPreview::new(&self.avgs), + ) + } +} + +pub struct ContainerBinsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + evs: &'a mut ContainerBins, + len: usize, +} + +impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + pub fn new(evs: &'a mut ContainerBins, len: usize) -> Self { + let len = len.min(evs.len()); + Self { evs, len } + } +} + +impl<'a, EVT> ContainerBinsTakeUpTo<'a, EVT> +where + EVT: EventValueType, +{ + pub fn ts1_first(&self) -> Option { + self.evs.ts1_first() + } + + pub fn ts2_last(&self) -> Option { + self.evs.ts2_last() + } + + pub fn len(&self) -> usize { + self.len + } + + pub fn pop_front(&mut self) -> Option> { + if self.len != 0 { + if let Some(ev) = self.evs.pop_front() { + self.len -= 1; + Some(ev) + } else { + None + } + } else { + None + } + } +} diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index bc137fe..74ced5c 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -29,8 +29,10 @@ pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a pub trait EventValueType: fmt::Debug + Clone + PartialOrd { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; + type AggTimeWeightOutputAvg; - fn sum_identity() -> Self; + fn identity_sum() -> Self; + fn add_weighted(&self, add: &Self, f: f32) -> Self; } impl Container for VecDeque @@ -53,28 +55,43 @@ where impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = >::OutputAvg; - fn sum_identity() -> Self { + fn identity_sum() -> Self { 0. } + + fn add_weighted(&self, add: &Self, f: f32) -> Self { + todo!() + } } impl EventValueType for f64 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = >::OutputAvg; - fn sum_identity() -> Self { + fn identity_sum() -> Self { 0. } + + fn add_weighted(&self, add: &Self, f: f32) -> Self { + todo!() + } } impl EventValueType for u64 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = >::OutputAvg; - fn sum_identity() -> Self { + fn identity_sum() -> Self { 0 } + + fn add_weighted(&self, add: &Self, f: f32) -> Self { + todo!() + } } #[derive(Debug, Clone)] @@ -138,7 +155,7 @@ where pp } - pub fn event_next(&mut self) -> Option> { + pub fn pop_front(&mut self) -> Option> { if let (Some(ts), Some(val)) = (self.tss.pop_front(), self.vals.pop_front()) { Some(EventSingle { ts, val }) } else { @@ -202,9 +219,9 @@ where self.len } - pub fn event_next(&mut self) -> Option> { + pub fn pop_front(&mut self) -> Option> { if self.len != 0 { - if let Some(ev) = self.evs.event_next() { + if let Some(ev) = self.evs.pop_front() { self.len -= 1; Some(ev) } else { diff --git a/crates/items_2/src/binning/test/events00.rs b/crates/items_2/src/binning/test/events00.rs index e5d29e7..a64b579 100644 --- a/crates/items_2/src/binning/test/events00.rs +++ b/crates/items_2/src/binning/test/events00.rs @@ -53,7 +53,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { } #[test] -fn test_bin_events_enum_simple_00() -> Result<(), Error> { +fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { let beg = TsNano::from_ms(100); let end = TsNano::from_ms(120); let nano_range = NanoRange { @@ -66,5 +66,6 @@ fn test_bin_events_enum_simple_00() -> Result<(), Error> { evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); binner.ingest(evs)?; + binner.range_final()?; Ok(()) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 1116fee..7de87f1 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -11,6 +11,7 @@ use netpod::log::*; use netpod::BinnedRange; use netpod::DtNano; use netpod::TsNano; +use std::collections::VecDeque; use std::marker::PhantomData; use std::pin::Pin; use std::task::Context; @@ -147,7 +148,7 @@ where minmax: &mut MinMax, ) -> Result<(), Error> { trace_ingest_event!("ingest_with_lst_gt_range_beg"); - while let Some(ev) = evs.event_next() { + while let Some(ev) = evs.pop_front() { trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); if ev.ts <= self.active_beg { panic!("should never get here"); @@ -168,7 +169,7 @@ where minmax: &mut MinMax, ) -> Result<(), Error> { trace_ingest_event!("ingest_with_lst_ge_range_beg"); - while let Some(ev) = evs.event_next() { + while let Some(ev) = evs.pop_front() { trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); if ev.ts < self.active_beg { panic!("should never get here"); @@ -207,6 +208,24 @@ where Ok(()) } } + + // PRECONDITION: filled_until < ts <= active_end + fn fill_until(&mut self, ts: TsNano, lst: LstRef) { + trace_cycle!("fill_until ts {:?}", ts); + let b = self; + assert!(b.filled_until < ts); + assert!(ts <= b.active_end); + b.agg.ingest(ts.delta(b.filled_until), b.active_len, lst.0.val.clone()); + b.filled_until = ts; + } + + fn fill_remaining_if_space_left(&mut self, lst: LstRef) { + trace_cycle!("fill_remaining_if_space_left"); + let b = self; + if b.filled_until < b.active_end { + b.fill_until(b.active_end, lst); + } + } } struct InnerA @@ -249,7 +268,7 @@ where if let Some(minmax) = self.minmax.as_mut() { self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) } else { - if let Some(ev) = evs.event_next() { + if let Some(ev) = evs.pop_front() { trace_event_next!("ingest_with_lst {:?}", ev); let beg = self.inner_b.active_beg; let end = self.inner_b.active_end; @@ -290,6 +309,7 @@ where lst: Option>, range: BinnedRange, inner_a: InnerA, + out: VecDeque, } impl BinnedEventsTimeweight @@ -314,12 +334,13 @@ where minmax: None, }, lst: None, + out: VecDeque::new(), } } fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { if ev.ts >= self.inner_a.inner_b.active_end { - Err(Error::EventAfterRange) + panic!("should never get here"); } else { trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev); self.lst = Some(ev.clone()); @@ -333,10 +354,10 @@ where } fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo) -> Result<(), Error> { - if let Some(ev) = evs.event_next() { + if let Some(ev) = evs.pop_front() { trace_event_next!("ingest_without_lst {:?}", ev); if ev.ts >= self.inner_a.inner_b.active_end { - Err(Error::EventAfterRange) + panic!("should never get here"); } else { self.ingest_event_without_lst(ev)?; if let Some(lst) = self.lst.as_mut() { @@ -364,6 +385,51 @@ where } } + fn cycle_01(&mut self, ts: TsNano) { + let b = &self.inner_a.inner_b; + trace_cycle!("cycle_01 {:?} {:?}", ts, b.active_end); + assert!(b.active_beg < ts); + let div = self.range.bin_len.ns(); + if let Some(lst) = self.lst.as_ref() { + loop { + let b = &self.inner_a.inner_b; + if b.filled_until >= ts { + break; + } + if ts >= b.active_end { + self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst)); + let b = &mut self.inner_a.inner_b; + { + // TODO push bin to output. + let res = b.agg.result_and_reset_for_new_bin(); + let cnt = b.cnt; + b.cnt = 0; + } + trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end); + let old_end = b.active_end; + let ts1 = TsNano::from_ns(b.active_end.ns() / div * div); + assert!(ts1 == old_end); + b.active_beg = ts1; + b.active_end = ts1.add_dt_nano(b.active_len); + b.filled_until = ts1; + self.inner_a.minmax = Some((lst.clone(), lst.clone())); + } else { + self.inner_a.inner_b.fill_until(ts, LstRef(lst)); + } + } + } else { + let ts1 = TsNano::from_ns(ts.ns() / div * div); + let b = &mut self.inner_a.inner_b; + b.active_beg = ts1; + b.active_end = ts1.add_dt_nano(b.active_len); + b.filled_until = ts1; + b.cnt = 0; + b.agg.reset_for_new_bin(); + assert!(self.inner_a.minmax.is_none()); + trace_cycle!("cycled direct to {:?} {:?}", b.active_beg, b.active_end); + } + } + pub fn ingest(&mut self, mut evs_all: ContainerEvents) -> Result<(), Error> { // It is this type's task to find and store the one-before event. // We then pass it to the aggregation. @@ -382,35 +448,13 @@ where // How to handle to not emit bins until at least some partially filled bin is encountered? break if let Some(ts) = evs_all.ts_first() { let b = &mut self.inner_a.inner_b; + if ts >= self.range.nano_end() { + return Err(Error::EventAfterRange); + } if ts >= b.active_end { trace_cycle!("bin edge boundary {:?}", b.active_end); - if let Some(lst) = self.lst.as_ref() { - trace_cycle!("fill remaining width"); - self.inner_a - .inner_b - .ingest_event_with_lst_gt_range_beg_agg(lst.clone(), LstRef(lst)); - } else { - // nothing to do - } - let b = &mut self.inner_a.inner_b; - if b.filled_until < b.active_beg { - panic!("fille until before bin begin"); - } else if b.filled_until == b.active_beg { - // TODO bin is meaningless - } else { - // TODO need the output type. - } - trace_cycle!("cycle bin {:?} {:?}", ts, b.active_end); - // TODO check if the bin has content to emit: either it itself contains events, or is filled with lst value. - // For the check for filled with lst I might need another flag. - let div = self.range.bin_len.ns(); - let ts1 = TsNano::from_ns(ts.ns() / div * div); - b.active_beg = ts1; - b.active_end = ts1.add_dt_nano(b.active_len); - b.filled_until = ts1; - b.cnt = 0; - b.agg.reset_for_new_bin(); - trace_cycle!("cycled to {:?} {:?}", b.active_beg, b.active_end); + assert!(b.filled_until < b.active_beg); + self.cycle_01(ts); } let n1 = evs_all.len(); let len_before = evs_all.len_before(self.inner_a.inner_b.active_end); @@ -440,7 +484,9 @@ where } pub fn range_final(&mut self) -> Result<(), Error> { - todo!() + trace_cycle!("range_final"); + self.cycle_01(self.range.nano_end()); + Ok(()) } } diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index a8ac319..413c43f 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -1,4 +1,5 @@ use super::aggregator::AggregatorTimeWeight; +use super::binnedvaluetype::BinnedNumericValue; use super::container_events::Container; use super::container_events::EventValueType; use crate::vecpreview::PreviewRange; @@ -48,27 +49,41 @@ pub struct EnumVariantAggregatorTimeWeight { } impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { + type OutputAvg = f32; + fn new() -> Self { Self { sum: 0. } } - fn reset_for_new_bin(&mut self) { - self.sum = 0. - } - fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EnumVariant) { let f = dt.ns() as f32 / bl.ns() as f32; - eprintln!("INGEST {} {:?}", f, val); - let h = items_0::scalar_ops::AsPrimF32::as_prim_f32_b(&val); - self.sum += f * h; + eprintln!("INGEST ENUM {} {:?}", f, val); + self.sum += f * val.ix() as f32; + } + + fn reset_for_new_bin(&mut self) { + self.sum = f32::identity_sum(); + } + + fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + let ret = self.sum.clone(); + self.sum = f32::identity_sum(); + ret } } impl EventValueType for EnumVariant { type Container = EnumVariantContainer; type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; + type AggTimeWeightOutputAvg = >::OutputAvg; - fn sum_identity() -> Self { + // TODO remove this from trait, only needed for common numeric cases but not in general. + fn identity_sum() -> Self { + todo!() + } + + // TODO also remove from trait, push it to a more specialized trait for the plain numeric cases. + fn add_weighted(&self, add: &Self, f: f32) -> Self { todo!() } }