From b229a756f8886cb21c3332e7aa749504d256b260 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 24 Sep 2024 14:58:26 +0200 Subject: [PATCH] WIP --- crates/items_2/src/binning/aggregator.rs | 86 ++++++++++--------- crates/items_2/src/binning/container_bins.rs | 24 ++++-- .../items_2/src/binning/container_events.rs | 15 ++-- crates/items_2/src/binning/test/events00.rs | 2 + .../binning/timeweight/timeweight_events.rs | 22 ++++- crates/items_2/src/binning/valuetype.rs | 6 +- 6 files changed, 97 insertions(+), 58 deletions(-) diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index 18993af..d29d5a6 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -1,101 +1,107 @@ -use super::binnedvaluetype::BinnedNumericValue; -use super::binnedvaluetype::BinnedValueType; use super::container_events::EventValueType; +use core::fmt; use netpod::DtNano; +use serde::Deserialize; +use serde::Serialize; + +pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Serialize + for<'a> Deserialize<'a> {} + +impl AggTimeWeightOutputAvg for u64 {} + +impl AggTimeWeightOutputAvg for f32 {} + +impl AggTimeWeightOutputAvg for f64 {} pub trait AggregatorTimeWeight where EVT: EventValueType, { - type OutputAvg; - fn new() -> Self; fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT); fn reset_for_new_bin(&mut self); - fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg; + fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg; } -pub struct AggregatorNumeric { - sum: EVT, +pub struct AggregatorNumeric { + sum: f64, } -trait AggWithSame: EventValueType {} +trait AggWithF64: EventValueType { + fn as_f64(&self) -> f64; +} -impl AggWithSame for f64 {} +impl AggWithF64 for f64 { + fn as_f64(&self) -> f64 { + *self + } +} -impl AggregatorTimeWeight for AggregatorNumeric +impl AggregatorTimeWeight for AggregatorNumeric where - EVT: AggWithSame, + EVT: AggWithF64, { - type OutputAvg = EVT; - fn new() -> Self { - Self { - sum: EVT::identity_sum(), - } + Self { sum: 0. } } fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) { - let f = dt.ns() as f32 / bl.ns() as f32; + let f = dt.ns() as f64 / bl.ns() as f64; eprintln!("INGEST {} {:?}", f, val); - self.sum.add_weighted(&val, f); + self.sum += f * val.as_f64(); } fn reset_for_new_bin(&mut self) { - self.sum = EVT::identity_sum(); + self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg { + // fn result_and_reset_for_new_bin(&mut self) -> f64 { let ret = self.sum.clone(); - self.sum = EVT::identity_sum(); + self.sum = 0.; ret } } -impl AggregatorTimeWeight for AggregatorNumeric { - type OutputAvg = f32; - +impl AggregatorTimeWeight for AggregatorNumeric { fn new() -> Self { - Self { - sum: f32::identity_sum(), - } + Self { sum: 0. } } fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) { - let f = dt.ns() as f32 / bl.ns() as f32; + let f = dt.ns() as f64 / bl.ns() as f64; eprintln!("INGEST {} {}", f, val); - self.sum += f * val; + self.sum += f * val as f64; } fn reset_for_new_bin(&mut self) { - self.sum = f32::identity_sum(); + self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + fn result_and_reset_for_new_bin(&mut self) -> f64 { let ret = self.sum.clone(); - self.sum = f32::identity_sum(); + self.sum = 0.; ret } } -impl AggregatorTimeWeight for AggregatorNumeric { - type OutputAvg = u64; - +impl AggregatorTimeWeight for AggregatorNumeric { fn new() -> Self { - todo!() + Self { sum: 0. } } fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) { - todo!() + let f = dt.ns() as f64 / bl.ns() as f64; + eprintln!("INGEST {} {}", f, val); + self.sum += f * val as f64; } fn reset_for_new_bin(&mut self) { - self.sum = u64::identity_sum(); + self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + fn result_and_reset_for_new_bin(&mut self) -> f64 { let ret = self.sum.clone(); - self.sum = u64::identity_sum(); + self.sum = 0.; ret } } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 4cd0c2e..495e6f4 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -52,7 +52,7 @@ where cnts: VecDeque, mins: VecDeque, maxs: VecDeque, - avgs: VecDeque, + avgs: VecDeque, lsts: VecDeque, } @@ -118,10 +118,24 @@ where todo!() } - // pub fn push_back(&mut self, ts1: TsNano, val: EVT) { - // self.tss.push_back(ts); - // self.vals.push_back(val); - // } + pub fn push_back( + &mut self, + ts1: TsNano, + ts2: TsNano, + cnt: u64, + min: EVT, + max: EVT, + avg: EVT::AggTimeWeightOutputAvg, + lst: EVT, + ) { + self.ts1s.push_back(ts1); + self.ts2s.push_back(ts2); + self.cnts.push_back(cnt); + self.mins.push_back(min); + self.maxs.push_back(max); + self.avgs.push_back(avg); + self.lsts.push_back(lst); + } } impl fmt::Debug for ContainerBins diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 74ced5c..6b1ab93 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -1,3 +1,4 @@ +use super::aggregator::AggTimeWeightOutputAvg; use super::aggregator::AggregatorNumeric; use super::aggregator::AggregatorTimeWeight; use super::___; @@ -29,7 +30,7 @@ pub trait Container: fmt::Debug + Clone + PreviewRange + Serialize + for<'a pub trait EventValueType: fmt::Debug + Clone + PartialOrd { type Container: Container; type AggregatorTimeWeight: AggregatorTimeWeight; - type AggTimeWeightOutputAvg; + type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg; fn identity_sum() -> Self; fn add_weighted(&self, add: &Self, f: f32) -> Self; @@ -54,8 +55,8 @@ where impl EventValueType for f32 { type Container = VecDeque; - type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = >::OutputAvg; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; fn identity_sum() -> Self { 0. @@ -68,8 +69,8 @@ impl EventValueType for f32 { impl EventValueType for f64 { type Container = VecDeque; - type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = >::OutputAvg; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; fn identity_sum() -> Self { 0. @@ -82,8 +83,8 @@ impl EventValueType for f64 { impl EventValueType for u64 { type Container = VecDeque; - type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = >::OutputAvg; + type AggregatorTimeWeight = AggregatorNumeric; + type AggTimeWeightOutputAvg = f64; fn identity_sum() -> Self { 0 diff --git a/crates/items_2/src/binning/test/events00.rs b/crates/items_2/src/binning/test/events00.rs index a64b579..578380f 100644 --- a/crates/items_2/src/binning/test/events00.rs +++ b/crates/items_2/src/binning/test/events00.rs @@ -67,5 +67,7 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); binner.ingest(evs)?; binner.range_final()?; + let bins = binner.output(); + eprintln!("{:?}", bins); Ok(()) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 7de87f1..39db0d9 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -1,6 +1,7 @@ use super::super::container_events::EventValueType; use super::___; use crate::binning::aggregator::AggregatorTimeWeight; +use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::container_events::ContainerEventsTakeUpTo; use crate::binning::container_events::EventSingle; @@ -309,7 +310,7 @@ where lst: Option>, range: BinnedRange, inner_a: InnerA, - out: VecDeque, + out: ContainerBins, } impl BinnedEventsTimeweight @@ -334,7 +335,7 @@ where minmax: None, }, lst: None, - out: VecDeque::new(), + out: ContainerBins::new(), } } @@ -399,11 +400,24 @@ where if ts >= b.active_end { self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst)); let b = &mut self.inner_a.inner_b; + let minmax = self.inner_a.minmax.get_or_insert_with(|| { + trace_cycle!("cycle_01 minmax not yet set"); + (lst.clone(), lst.clone()) + }); { // TODO push bin to output. let res = b.agg.result_and_reset_for_new_bin(); let cnt = b.cnt; b.cnt = 0; + self.out.push_back( + b.active_beg, + b.active_end, + b.cnt, + minmax.0.val.clone(), + minmax.1.val.clone(), + res, + lst.val.clone(), + ); } trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end); let old_end = b.active_end; @@ -488,6 +502,10 @@ where self.cycle_01(self.range.nano_end()); Ok(()) } + + pub fn output(&mut self) -> ContainerBins { + ::core::mem::replace(&mut self.out, ContainerBins::new()) + } } pub struct BinnedEventsTimeweightStream {} diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index 413c43f..c15af30 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -49,8 +49,6 @@ pub struct EnumVariantAggregatorTimeWeight { } impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { - type OutputAvg = f32; - fn new() -> Self { Self { sum: 0. } } @@ -65,7 +63,7 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { self.sum = f32::identity_sum(); } - fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg { + fn result_and_reset_for_new_bin(&mut self) -> ::AggTimeWeightOutputAvg { let ret = self.sum.clone(); self.sum = f32::identity_sum(); ret @@ -75,7 +73,7 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { impl EventValueType for EnumVariant { type Container = EnumVariantContainer; type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight; - type AggTimeWeightOutputAvg = >::OutputAvg; + type AggTimeWeightOutputAvg = f32; // TODO remove this from trait, only needed for common numeric cases but not in general. fn identity_sum() -> Self {