From e17bb885fc2308268ecbe620e87e7b71b0487358 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 20 Nov 2024 16:12:28 +0100 Subject: [PATCH] Attempt better binned bins --- src/binning/aggregator.rs | 4 +- src/binning/container/bins.rs | 77 +++++++----- src/binning/container_bins.rs | 5 +- src/binning/test/bins00.rs | 2 +- src/binning/test/bins_gen.rs | 16 ++- src/binning/timeweight/timeweight_bins.rs | 136 ++++++++++++++++++---- src/lib.rs | 3 + 7 files changed, 184 insertions(+), 59 deletions(-) diff --git a/src/binning/aggregator.rs b/src/binning/aggregator.rs index cd860b2..705bfc3 100644 --- a/src/binning/aggregator.rs +++ b/src/binning/aggregator.rs @@ -61,7 +61,7 @@ impl AggWithF64 for f64 { impl AggregatorTimeWeight for AggregatorNumeric where - EVT: AggWithF64, + EVT: EventValueType + AggWithF64, { fn new() -> Self { Self { sum: 0. } @@ -87,7 +87,7 @@ where sum, filled_width_fraction ); - self.sum = 0.; + >::reset_for_new_bin(self); sum / filled_width_fraction as f64 } } diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs index ae261ec..1cac8fb 100644 --- a/src/binning/container/bins.rs +++ b/src/binning/container/bins.rs @@ -1,5 +1,3 @@ -use crate::binning::container_bins::ContainerBins; -use crate::binning::container_events::EventValueType; use crate::binning::container_events::PartialOrdEvtA; use items_0::vecpreview::PreviewRange; use netpod::DtNano; @@ -13,9 +11,9 @@ where BVT: BinAggedType, { fn new() -> Self; - fn ingest(&mut self, bl: DtNano, val: BVT); + fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: BVT); + fn result(&mut self, filled_width_fraction: f32) -> BVT; fn reset_for_new_bin(&mut self); - fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> BVT; } pub trait BinAggedContainer: @@ -33,20 +31,10 @@ pub trait BinAggedType: fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a> { type Container: BinAggedContainer; - type AggregatorTimeWeight: AggBinValTw; + type AggregatorTw: AggBinValTw; type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA + Into; } -impl PreviewRange for ContainerBins -where - EVT: EventValueType, - BVT: BinAggedType, -{ - fn preview<'a>(&'a self) -> Box { - todo!() - } -} - impl BinAggedContainer for VecDeque where BVT: BinAggedType, @@ -91,35 +79,64 @@ where impl BinAggedType for f32 { type Container = VecDeque; - type AggregatorTimeWeight = (); + type AggregatorTw = AggBinValTwF32; type IterTy1<'a> = Self; } impl BinAggedType for f64 { type Container = VecDeque; - type AggregatorTimeWeight = (); + type AggregatorTw = AggBinValTwF64; type IterTy1<'a> = Self; } -impl AggBinValTw for () -where - T: BinAggedType, -{ +#[derive(Debug)] +pub struct AggBinValTwF32 { + sum: f32, +} + +impl AggBinValTw for AggBinValTwF32 { fn new() -> Self { - todo!() + Self { sum: 0. } } - fn ingest(&mut self, bl: DtNano, val: T) { - todo!() + fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f32) { + let f = dt.ns() as f32 / bl.ns() as f32; + self.sum += f * val; + } + + fn result(&mut self, filled_width_fraction: f32) -> f32 { + let ret = self.sum.clone() / filled_width_fraction; + >::reset_for_new_bin(self); + ret } fn reset_for_new_bin(&mut self) { - todo!() - } - - fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> T { - todo!() + self.sum = 0.; } } -pub struct DummyPayload {} +#[derive(Debug)] +pub struct AggBinValTwF64 { + sum: f64, +} + +impl AggBinValTw for AggBinValTwF64 { + fn new() -> Self { + Self { sum: 0. } + } + + fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f64) { + let f = dt.ns() as f32 / bl.ns() as f32; + self.sum += f as f64 * val; + } + + fn result(&mut self, filled_width_fraction: f32) -> f64 { + let ret = self.sum.clone() / filled_width_fraction as f64; + >::reset_for_new_bin(self); + ret + } + + fn reset_for_new_bin(&mut self) { + self.sum = 0.; + } +} diff --git a/src/binning/container_bins.rs b/src/binning/container_bins.rs index f1be89a..f686bff 100644 --- a/src/binning/container_bins.rs +++ b/src/binning/container_bins.rs @@ -621,10 +621,6 @@ where EVT: EventValueType, BVT: BinAggedType, { - fn type_name(&self) -> &'static str { - any::type_name::() - } - fn empty(&self) -> BinsBoxed { Box::new(Self::new()) } @@ -668,6 +664,7 @@ where range: netpod::BinnedRange, ) -> Box { let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::< + EVT, EVT::AggTimeWeightOutputAvg, >::new(range); Box::new(ret) diff --git a/src/binning/test/bins00.rs b/src/binning/test/bins00.rs index b49d230..a34153a 100644 --- a/src/binning/test/bins00.rs +++ b/src/binning/test/bins00.rs @@ -29,7 +29,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { let fut = async { let beg = TsNano::from_ms(100); let end = TsNano::from_ms(500); - let bin_len = DtMs::from_ms_u64(10); + let bin_len = DtMs::from_ms_u64(100); let nano_range = NanoRange { beg: beg.ns(), end: end.ns(), diff --git a/src/binning/test/bins_gen.rs b/src/binning/test/bins_gen.rs index ec12c56..fb8f2a5 100644 --- a/src/binning/test/bins_gen.rs +++ b/src/binning/test/bins_gen.rs @@ -5,6 +5,8 @@ use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::Sitemty; use items_0::streamitem::StreamItem; use items_0::timebin::BinningggContainerBinsDyn; +use netpod::DtNano; +use netpod::TsNano; use std::pin::Pin; pub(super) fn boxed_conts(inp: S) -> Pin::Item> + Send>> @@ -16,9 +18,19 @@ where pub(super) fn bins_gen_dim0_f32_v00( ) -> impl Stream>> { - futures_util::stream::iter((0usize..1000).into_iter()) + futures_util::stream::iter((9u64..100).into_iter()) .map(|x| { - let c = ContainerBins::::new(); + let mut c = ContainerBins::::new(); + let bl = DtNano::from_ms(10); + let ts1 = TsNano::from_ms(bl.ms_u64() * x); + let ts2 = ts1.add_dt_nano(bl); + let cnt = 8; + let min = 2.; + let max = 4.; + let agg = 2.2; + let lst = 2.4; + let fnl = true; + c.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl); Box::new(c) as Box }) .map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x)))) diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index fe01278..c953287 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -1,3 +1,4 @@ +use crate::binning::container::bins::AggBinValTw; use crate::binning::container::bins::BinAggedType; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::EventValueType; @@ -7,59 +8,154 @@ use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use netpod::BinnedRange; use netpod::TsNano; -use std::marker::PhantomData; +use std::any; -#[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } +macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[derive(Debug, thiserror::Error)] +#[cstm(name = "BinBinsTimeweight")] +pub enum Error {} + #[derive(Debug)] -pub struct BinnedBinsTimeweight +pub struct BinnedBinsTimeweight where + EVT: EventValueType, BVT: BinAggedType, { range: BinnedRange, - // out: ContainerBins, - produce_cnt_zero: bool, - // agg: , - t1: PhantomData, + active_beg: TsNano, + active_end: TsNano, + cnt: u64, + min: Option, + max: Option, + lst: Option, + agg: ::AggregatorTw, + non_fnl: bool, + out: ContainerBins, } -impl BinnedBinsTimeweight +impl BinnedBinsTimeweight where + EVT: EventValueType, BVT: BinAggedType, { pub fn new(range: BinnedRange) -> Self { trace_init!("BinnedBinsTimeweight::new {}", range); let active_beg = range.nano_beg(); - let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano()); - let active_len = active_end.delta(active_beg); + let active_end = active_beg.add_dt_nano(range.bin_len_dt_ns()); Self { range, - // out: ContainerBins::new(), - produce_cnt_zero: true, - // agg: todo!(), - t1: PhantomData, + active_beg, + active_end, + cnt: 0, + min: None, + max: None, + lst: None, + agg: BVT::AggregatorTw::new(), + non_fnl: false, + out: ContainerBins::new(), } } + + fn maybe_emit_active(&mut self) { + if self.cnt != 0 { + let ts1 = self.active_beg; + let ts2 = self.active_end; + let cnt = self.cnt; + let min = self.min.as_ref().unwrap().clone(); + let max = self.max.as_ref().unwrap().clone(); + let fr = 1.; + let agg = self.agg.result(fr); + self.agg.reset_for_new_bin(); + let lst = self.lst.as_ref().unwrap().clone(); + let fnl = self.non_fnl == false; + self.out.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl); + } + } + + fn active_forward(&mut self, ts1: TsNano) { + self.cnt = 0; + self.min = self.lst.clone(); + self.max = self.lst.clone(); + let bl = self.range.bin_len_dt_ns(); + let tsnext = TsNano::from_ns(ts1.ns() / bl.ns() * bl.ns()); + self.active_beg = tsnext; + self.active_end = tsnext.add_dt_nano(bl); + self.non_fnl = false; + } + + fn bound(a: &mut Option, b: &EVT, f: impl Fn(&EVT, &EVT) -> bool) { + if let Some(x) = a.as_mut() { + if f(b, x) { + *x = b.clone(); + } + } else { + *a = Some(b.clone()); + } + } + + fn ingest_bins(&mut self, bins: &ContainerBins) -> Result<(), BinningggError> { + for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() { + let grid = self.range.bin_len_dt_ns(); + trace_ingest_bin!("grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg); + if ts1 < self.active_beg { + self.lst = Some(lst.clone()); + } else { + if ts1 >= self.active_end { + self.maybe_emit_active(); + self.active_forward(ts1); + } + self.cnt += cnt; + Self::bound(&mut self.min, min, PartialOrd::lt); + Self::bound(&mut self.max, max, PartialOrd::gt); + let dt = ts2.delta(ts1); + let bl = self.range.bin_len_dt_ns(); + self.agg.ingest(dt, bl, cnt, agg.clone()); + self.non_fnl |= !fnl; + self.lst = Some(lst.clone()); + } + } + Ok(()) + } } -impl BinnedBinsTimeweightTrait for BinnedBinsTimeweight +impl BinnedBinsTimeweightTrait for BinnedBinsTimeweight where + EVT: EventValueType, BVT: BinAggedType, { - fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> { - todo!() + fn ingest(&mut self, bins: &BinsBoxed) -> Result<(), BinningggError> { + if let Some(bins) = bins.as_any_ref().downcast_ref::>() { + self.ingest_bins(bins) + } else { + Err(BinningggError::TypeMismatch { + have: bins.type_name().into(), + expect: any::type_name::().into(), + }) + } } fn input_done_range_final(&mut self) -> Result<(), BinningggError> { - todo!() + self.maybe_emit_active(); + self.active_forward(self.active_beg.add_dt_nano(self.range.bin_len_dt_ns())); + Ok(()) } fn input_done_range_open(&mut self) -> Result<(), BinningggError> { - todo!() + self.non_fnl = true; + self.maybe_emit_active(); + self.active_forward(self.active_beg.add_dt_nano(self.range.bin_len_dt_ns())); + Ok(()) } fn output(&mut self) -> Result, BinningggError> { - todo!() + if self.out.len() == 0 { + Ok(None) + } else { + let ret = std::mem::replace(&mut self.out, ContainerBins::new()); + Ok(Some(Box::new(ret))) + } } } diff --git a/src/lib.rs b/src/lib.rs index 47a645c..5caa202 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,9 @@ use items_0::Events; use std::fmt; mod log { + #[cfg(not(test))] + pub use netpod::log::*; + #[cfg(test)] pub use netpod::log::*; }