This commit is contained in:
Dominik Werder
2024-09-24 14:58:26 +02:00
parent 9e9f33e086
commit b229a756f8
6 changed files with 97 additions and 58 deletions

View File

@@ -1,101 +1,107 @@
use super::binnedvaluetype::BinnedNumericValue;
use super::binnedvaluetype::BinnedValueType;
use super::container_events::EventValueType;
use core::fmt;
use netpod::DtNano;
use serde::Deserialize;
use serde::Serialize;
pub trait AggTimeWeightOutputAvg: fmt::Debug + Clone + Serialize + for<'a> Deserialize<'a> {}
impl AggTimeWeightOutputAvg for u64 {}
impl AggTimeWeightOutputAvg for f32 {}
impl AggTimeWeightOutputAvg for f64 {}
pub trait AggregatorTimeWeight<EVT>
where
EVT: EventValueType,
{
type OutputAvg;
fn new() -> Self;
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT);
fn reset_for_new_bin(&mut self);
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg;
fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg;
}
pub struct AggregatorNumeric<EVT> {
sum: EVT,
pub struct AggregatorNumeric {
sum: f64,
}
trait AggWithSame: EventValueType {}
trait AggWithF64: EventValueType<AggTimeWeightOutputAvg = f64> {
fn as_f64(&self) -> f64;
}
impl AggWithSame for f64 {}
impl AggWithF64 for f64 {
fn as_f64(&self) -> f64 {
*self
}
}
impl<EVT> AggregatorTimeWeight<EVT> for AggregatorNumeric<EVT>
impl<EVT> AggregatorTimeWeight<EVT> for AggregatorNumeric
where
EVT: AggWithSame,
EVT: AggWithF64,
{
type OutputAvg = EVT;
fn new() -> Self {
Self {
sum: EVT::identity_sum(),
}
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) {
let f = dt.ns() as f32 / bl.ns() as f32;
let f = dt.ns() as f64 / bl.ns() as f64;
eprintln!("INGEST {} {:?}", f, val);
self.sum.add_weighted(&val, f);
self.sum += f * val.as_f64();
}
fn reset_for_new_bin(&mut self) {
self.sum = EVT::identity_sum();
self.sum = 0.;
}
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg {
// fn result_and_reset_for_new_bin(&mut self) -> f64 {
let ret = self.sum.clone();
self.sum = EVT::identity_sum();
self.sum = 0.;
ret
}
}
impl AggregatorTimeWeight<f32> for AggregatorNumeric<f32> {
type OutputAvg = f32;
impl AggregatorTimeWeight<f32> for AggregatorNumeric {
fn new() -> Self {
Self {
sum: f32::identity_sum(),
}
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) {
let f = dt.ns() as f32 / bl.ns() as f32;
let f = dt.ns() as f64 / bl.ns() as f64;
eprintln!("INGEST {} {}", f, val);
self.sum += f * val;
self.sum += f * val as f64;
}
fn reset_for_new_bin(&mut self) {
self.sum = f32::identity_sum();
self.sum = 0.;
}
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
fn result_and_reset_for_new_bin(&mut self) -> f64 {
let ret = self.sum.clone();
self.sum = f32::identity_sum();
self.sum = 0.;
ret
}
}
impl AggregatorTimeWeight<u64> for AggregatorNumeric<u64> {
type OutputAvg = u64;
impl AggregatorTimeWeight<u64> for AggregatorNumeric {
fn new() -> Self {
todo!()
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) {
todo!()
let f = dt.ns() as f64 / bl.ns() as f64;
eprintln!("INGEST {} {}", f, val);
self.sum += f * val as f64;
}
fn reset_for_new_bin(&mut self) {
self.sum = u64::identity_sum();
self.sum = 0.;
}
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
fn result_and_reset_for_new_bin(&mut self) -> f64 {
let ret = self.sum.clone();
self.sum = u64::identity_sum();
self.sum = 0.;
ret
}
}

View File

@@ -52,7 +52,7 @@ where
cnts: VecDeque<u64>,
mins: VecDeque<EVT>,
maxs: VecDeque<EVT>,
avgs: VecDeque<f32>,
avgs: VecDeque<EVT::AggTimeWeightOutputAvg>,
lsts: VecDeque<EVT>,
}
@@ -118,10 +118,24 @@ where
todo!()
}
// pub fn push_back(&mut self, ts1: TsNano, val: EVT) {
// self.tss.push_back(ts);
// self.vals.push_back(val);
// }
pub fn push_back(
&mut self,
ts1: TsNano,
ts2: TsNano,
cnt: u64,
min: EVT,
max: EVT,
avg: EVT::AggTimeWeightOutputAvg,
lst: EVT,
) {
self.ts1s.push_back(ts1);
self.ts2s.push_back(ts2);
self.cnts.push_back(cnt);
self.mins.push_back(min);
self.maxs.push_back(max);
self.avgs.push_back(avg);
self.lsts.push_back(lst);
}
}
impl<EVT> fmt::Debug for ContainerBins<EVT>

View File

@@ -1,3 +1,4 @@
use super::aggregator::AggTimeWeightOutputAvg;
use super::aggregator::AggregatorNumeric;
use super::aggregator::AggregatorTimeWeight;
use super::___;
@@ -29,7 +30,7 @@ pub trait Container<EVT>: fmt::Debug + Clone + PreviewRange + Serialize + for<'a
pub trait EventValueType: fmt::Debug + Clone + PartialOrd {
type Container: Container<Self>;
type AggregatorTimeWeight: AggregatorTimeWeight<Self>;
type AggTimeWeightOutputAvg;
type AggTimeWeightOutputAvg: AggTimeWeightOutputAvg;
fn identity_sum() -> Self;
fn add_weighted(&self, add: &Self, f: f32) -> Self;
@@ -54,8 +55,8 @@ where
impl EventValueType for f32 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric<Self>;
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
fn identity_sum() -> Self {
0.
@@ -68,8 +69,8 @@ impl EventValueType for f32 {
impl EventValueType for f64 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric<Self>;
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
fn identity_sum() -> Self {
0.
@@ -82,8 +83,8 @@ impl EventValueType for f64 {
impl EventValueType for u64 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = AggregatorNumeric<Self>;
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
type AggregatorTimeWeight = AggregatorNumeric;
type AggTimeWeightOutputAvg = f64;
fn identity_sum() -> Self {
0

View File

@@ -67,5 +67,7 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> {
evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two"));
binner.ingest(evs)?;
binner.range_final()?;
let bins = binner.output();
eprintln!("{:?}", bins);
Ok(())
}

View File

@@ -1,6 +1,7 @@
use super::super::container_events::EventValueType;
use super::___;
use crate::binning::aggregator::AggregatorTimeWeight;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::ContainerEvents;
use crate::binning::container_events::ContainerEventsTakeUpTo;
use crate::binning::container_events::EventSingle;
@@ -309,7 +310,7 @@ where
lst: Option<EventSingle<EVT>>,
range: BinnedRange<TsNano>,
inner_a: InnerA<EVT>,
out: VecDeque<EVT::AggTimeWeightOutputAvg>,
out: ContainerBins<EVT>,
}
impl<EVT> BinnedEventsTimeweight<EVT>
@@ -334,7 +335,7 @@ where
minmax: None,
},
lst: None,
out: VecDeque::new(),
out: ContainerBins::new(),
}
}
@@ -399,11 +400,24 @@ where
if ts >= b.active_end {
self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst));
let b = &mut self.inner_a.inner_b;
let minmax = self.inner_a.minmax.get_or_insert_with(|| {
trace_cycle!("cycle_01 minmax not yet set");
(lst.clone(), lst.clone())
});
{
// TODO push bin to output.
let res = b.agg.result_and_reset_for_new_bin();
let cnt = b.cnt;
b.cnt = 0;
self.out.push_back(
b.active_beg,
b.active_end,
b.cnt,
minmax.0.val.clone(),
minmax.1.val.clone(),
res,
lst.val.clone(),
);
}
trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end);
let old_end = b.active_end;
@@ -488,6 +502,10 @@ where
self.cycle_01(self.range.nano_end());
Ok(())
}
pub fn output(&mut self) -> ContainerBins<EVT> {
::core::mem::replace(&mut self.out, ContainerBins::new())
}
}
pub struct BinnedEventsTimeweightStream {}

View File

@@ -49,8 +49,6 @@ pub struct EnumVariantAggregatorTimeWeight {
}
impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
type OutputAvg = f32;
fn new() -> Self {
Self { sum: 0. }
}
@@ -65,7 +63,7 @@ impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
self.sum = f32::identity_sum();
}
fn result_and_reset_for_new_bin(&mut self) -> Self::OutputAvg {
fn result_and_reset_for_new_bin(&mut self) -> <EnumVariant as EventValueType>::AggTimeWeightOutputAvg {
let ret = self.sum.clone();
self.sum = f32::identity_sum();
ret
@@ -75,7 +73,7 @@ impl AggregatorTimeWeight<EnumVariant> for EnumVariantAggregatorTimeWeight {
impl EventValueType for EnumVariant {
type Container = EnumVariantContainer;
type AggregatorTimeWeight = EnumVariantAggregatorTimeWeight;
type AggTimeWeightOutputAvg = <Self::AggregatorTimeWeight as AggregatorTimeWeight<Self>>::OutputAvg;
type AggTimeWeightOutputAvg = f32;
// TODO remove this from trait, only needed for common numeric cases but not in general.
fn identity_sum() -> Self {