From 0d1132da4cd7d231e316ed31e3b28867b877f194 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 2 Oct 2024 15:28:11 +0200 Subject: [PATCH] WIP --- crates/items_2/src/binning/container_bins.rs | 8 + crates/items_2/src/binning/test/events00.rs | 161 ++++++++++++++++-- .../binning/timeweight/timeweight_events.rs | 40 +++-- 3 files changed, 177 insertions(+), 32 deletions(-) diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 5e38238..5bc1e12 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -157,6 +157,14 @@ where self.cnts.iter() } + pub fn mins_iter(&self) -> std::collections::vec_deque::Iter { + self.mins.iter() + } + + pub fn maxs_iter(&self) -> std::collections::vec_deque::Iter { + self.maxs.iter() + } + pub fn len_before(&self, end: TsNano) -> usize { let pp = self.ts2s.partition_point(|&x| x <= end); assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len()); diff --git a/crates/items_2/src/binning/test/events00.rs b/crates/items_2/src/binning/test/events00.rs index 51110ba..ba19c03 100644 --- a/crates/items_2/src/binning/test/events00.rs +++ b/crates/items_2/src/binning/test/events00.rs @@ -60,31 +60,71 @@ fn exp_u64<'a>( ) -> Result<(), Error> { let mut it_a = vals; let mut it_b = exps; + let mut i = 0; loop { let a = it_a.next(); let b = it_b.next(); if a.is_none() && b.is_none() { break; } - if let (Some(val), Some(exp)) = (a, b) { + if let (Some(&val), Some(&exp)) = (a, b) { if val != exp { - return Err(Error::AssertMsg(format!("{tag} expect value {} vs {}", val, exp))); + return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i))); } } else { return Err(Error::AssertMsg(format!("{tag} len mismatch"))); } + i += 1; } Ok(()) } +fn exp_f32<'a>( + vals: impl Iterator, + exps: impl Iterator, + tag: &str, +) -> Result<(), Error> { + let mut it_a = vals; + let mut it_b = exps; + let mut i = 0; + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(&val), Some(&exp)) = (a, b) { + if netpod::f32_close(val, exp) == false { + return Err(Error::AssertMsg(format!("{tag} val {} exp {} i {}", val, exp, i))); + } + } else { + return Err(Error::AssertMsg(format!("{tag} len mismatch"))); + } + i += 1; + } + Ok(()) +} + +#[cfg(test)] fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { exp_u64(bins.cnts_iter(), exps.into_vec_deque_u64().iter(), "exp_cnts") } +#[cfg(test)] +fn exp_mins(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + exp_f32(bins.mins_iter(), exps.into_vec_deque_f32().iter(), "exp_mins") +} + +#[cfg(test)] +fn exp_maxs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + exp_f32(bins.maxs_iter(), exps.into_vec_deque_f32().iter(), "exp_maxs") +} + fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { let exps = exps.into_vec_deque_f32(); let mut it_a = bins.iter_debug(); let mut it_b = exps.iter(); + let mut i = 0; loop { let a = it_a.next(); let b = it_b.next(); @@ -94,7 +134,7 @@ fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), if let (Some(a), Some(&exp)) = (a, b) { let val = *a.avg as f32; if netpod::f32_close(val, exp) == false { - return Err(Error::AssertMsg(format!("expect value {} vs {}", val, exp))); + return Err(Error::AssertMsg(format!("exp_avgs val {} exp {} i {}", val, exp, i))); } } else { return Err(Error::AssertMsg(format!( @@ -103,6 +143,7 @@ fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), exps.len() ))); } + i += 1; } Ok(()) } @@ -122,14 +163,17 @@ fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_final()?; let bins = binner.output(); - assert_eq!(bins.len(), 1); + exp_cnts(&bins, "0")?; + exp_mins(&bins, "2.")?; + exp_maxs(&bins, "2.")?; + exp_avgs(&bins, "2.")?; let bins = binner.output(); assert_eq!(bins.len(), 0); Ok(()) } #[test] -fn test_bin_events_f32_simple_with_before_01() -> Result<(), Error> { +fn test_bin_events_f32_simple_with_before_01_range_final() -> Result<(), Error> { let beg = TsNano::from_ms(110); let end = TsNano::from_ms(130); let nano_range = NanoRange { @@ -144,9 +188,10 @@ fn test_bin_events_f32_simple_with_before_01() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_final()?; let bins = binner.output(); - assert_eq!(bins.len(), 2); exp_cnts(&bins, "0 0")?; - exp_avgs(&bins, "2.00 2.00")?; + exp_mins(&bins, "2. 2.")?; + exp_maxs(&bins, "2. 2.")?; + exp_avgs(&bins, "2. 2.")?; let bins = binner.output(); assert_eq!(bins.len(), 0); Ok(()) @@ -175,12 +220,12 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_open()?; let bins = binner.output(); - trace!("{bins:?}"); for b in bins.iter_debug() { trace!("{b:?}"); } - assert_eq!(bins.len(), 2); - exp_cnts(&bins, "2 3")?; + exp_cnts(&bins, "2 3")?; + exp_mins(&bins, "2. 1.")?; + exp_maxs(&bins, "2.4 2.4")?; exp_avgs(&bins, "2.24 1.5333")?; let bins = binner.output(); assert_eq!(bins.len(), 0); @@ -210,12 +255,13 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_open()?; let bins = binner.output(); - trace!("{bins:?}"); for b in bins.iter_debug() { trace!("{b:?}"); } assert_eq!(bins.len(), 2); - exp_cnts(&bins, "2 3")?; + exp_cnts(&bins, "2 3")?; + exp_mins(&bins, "2. 1.")?; + exp_maxs(&bins, "2.4 2.4")?; exp_avgs(&bins, "2.30 1.5333")?; let bins = binner.output(); assert_eq!(bins.len(), 0); @@ -245,12 +291,13 @@ fn test_bin_events_f32_small_range_final() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_final()?; let bins = binner.output(); - trace!("{bins:?}"); for b in bins.iter_debug() { trace!("{b:?}"); } assert_eq!(bins.len(), 2); exp_cnts(&bins, "2 3")?; + exp_mins(&bins, "2. 1.")?; + exp_maxs(&bins, "2.4 2.4")?; exp_avgs(&bins, "2.30 1.44")?; let bins = binner.output(); assert_eq!(bins.len(), 0); @@ -287,13 +334,14 @@ fn test_bin_events_f32_small_intermittent_silence_range_open() -> Result<(), Err binner.ingest(evs)?; binner.input_done_range_open()?; let bins = binner.output(); - trace!("{bins:?}"); for b in bins.iter_debug() { trace!("{b:?}"); } assert_eq!(bins.len(), 5); exp_cnts(&bins, "2 3 0 0 2")?; - exp_avgs(&bins, "2.30 1.44 1.4 1.4 1.375")?; + exp_mins(&bins, "2.0 1.0 1.4 1.4 1.2")?; + exp_maxs(&bins, "2.4 2.4 1.4 1.4 1.4")?; + exp_avgs(&bins, "2.30 1.44 1.4 1.4 1.375")?; let bins = binner.output(); assert_eq!(bins.len(), 0); Ok(()) @@ -329,18 +377,96 @@ fn test_bin_events_f32_small_intermittent_silence_range_final() -> Result<(), Er binner.ingest(evs)?; binner.input_done_range_final()?; let bins = binner.output(); - trace!("{bins:?}"); for b in bins.iter_debug() { trace!("{b:?}"); } - assert_eq!(bins.len(), 5); exp_cnts(&bins, "2 3 0 0 2")?; + exp_mins(&bins, "2.0 1.0 1.4 1.4 1.2")?; + exp_maxs(&bins, "2.4 2.4 1.4 1.4 1.4")?; exp_avgs(&bins, "2.30 1.44 1.4 1.4 1.34")?; let bins = binner.output(); assert_eq!(bins.len(), 0); Ok(()) } +#[test] +fn test_bin_events_f32_small_intermittent_silence_minmax_no_edge_range_final() -> Result<(), Error> { + let beg = TsNano::from_ms(110); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 109, 50.); + binner.ingest(evs)?; + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 111, 40.); + // pu(em, 112, 1.2); + // binner.ingest(evs)?; + // let mut evs = ContainerEvents::::new(); + // let em = &mut evs; + // pu(em, 113, 1.4); + // pu(em, 120, 1.4); + // pu(em, 146, 1.3); + // pu(em, 148, 1.2); + binner.ingest(evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + for b in bins.iter_debug() { + trace!("{b:?}"); + } + exp_cnts(&bins, "1")?; + exp_mins(&bins, "40.")?; + exp_maxs(&bins, "50.")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); + Ok(()) +} + +#[test] +fn test_bin_events_f32_small_intermittent_silence_minmax_edge_range_final() -> Result<(), Error> { + let beg = TsNano::from_ms(110); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 109, 50.); + binner.ingest(evs)?; + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 110, 40.); + // pu(em, 112, 1.2); + // binner.ingest(evs)?; + // let mut evs = ContainerEvents::::new(); + // let em = &mut evs; + // pu(em, 113, 1.4); + // pu(em, 120, 1.4); + // pu(em, 146, 1.3); + // pu(em, 148, 1.2); + binner.ingest(evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + for b in bins.iter_debug() { + trace!("{b:?}"); + } + exp_cnts(&bins, "1")?; + exp_mins(&bins, "40.")?; + exp_maxs(&bins, "40.")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); + Ok(()) +} + #[test] fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { let beg = TsNano::from_ms(100); @@ -357,6 +483,5 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { binner.ingest(evs)?; binner.input_done_range_final()?; let bins = binner.output(); - trace!("{:?}", bins); Ok(()) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index a24a217..c29be6e 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -117,9 +117,11 @@ where } fn ingest_event_with_lst_gt_range_beg_2(&mut self, ev: EventSingle, lst: LstMut) -> Result<(), Error> { - trace_ingest_event!("ingest_event_with_lst_gt_range_beg_2"); + let selfname = "ingest_event_with_lst_gt_range_beg_2"; + trace_ingest_event!("{selfname}"); self.ingest_event_with_lst_gt_range_beg_agg(ev.clone(), LstRef(lst.0)); InnerA::apply_lst_after_event_handled(ev, lst); + // self.cnt += 1; Ok(()) } @@ -129,7 +131,8 @@ where lst: LstMut, minmax: &mut MinMax, ) -> Result<(), Error> { - trace_ingest_event!("ingest_event_with_lst_gt_range_beg"); + let selfname = "ingest_event_with_lst_gt_range_beg"; + trace_ingest_event!("{selfname}"); // TODO if the event is exactly on the current bin first edge, then there is no contribution to the avg yet // and I must initialize the min/max with the current event. InnerA::apply_min_max(&ev, minmax); @@ -268,18 +271,21 @@ where fn init_minmax_with_lst(&mut self, ev: &EventSingle, lst: LstRef) { trace_ingest_minmax!("init_minmax_with_lst {:?} {:?}", ev, lst.0); - self.minmax = Some((lst.0.clone(), lst.0.clone())); - Self::apply_min_max(ev, self.minmax.as_mut().unwrap()); + let minmax = self.minmax.insert((lst.0.clone(), lst.0.clone())); + Self::apply_min_max(ev, minmax); } fn ingest_with_lst(&mut self, mut evs: ContainerEventsTakeUpTo, lst: LstMut) -> Result<(), Error> { + let selfname = "ingest_with_lst"; + trace_ingest_container!("{selfname}"); + let b = &mut self.inner_b; if let Some(minmax) = self.minmax.as_mut() { - self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) + b.ingest_with_lst_minmax(evs, lst, minmax) } else { if let Some(ev) = evs.pop_front() { - trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst"); - let beg = self.inner_b.active_beg; - let end = self.inner_b.active_end; + trace_event_next!("EVENT POP FRONT {:?} {selfname:30}", ev); + let beg = b.active_beg; + let end = b.active_end; if ev.ts < beg { panic!("should never get here"); } else if ev.ts >= end { @@ -288,16 +294,20 @@ where if ev.ts == beg { self.init_minmax(&ev); InnerA::apply_lst_after_event_handled(ev, lst); + let b = &mut self.inner_b; + b.cnt += 1; Ok(()) } else { self.init_minmax_with_lst(&ev, LstRef(lst.0)); + let b = &mut self.inner_b; if let Some(minmax) = self.minmax.as_mut() { if ev.ts == beg { panic!("logic error, is handled before"); } else { - self.inner_b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?; + b.ingest_event_with_lst_gt_range_beg_2(ev, LstMut(lst.0))?; } - self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) + b.cnt += 1; + b.ingest_with_lst_minmax(evs, lst, minmax) } else { Err(Error::NoMinMaxAfterInit) } @@ -395,16 +405,18 @@ where } fn ingest_event_without_lst(&mut self, ev: EventSingle) -> Result<(), Error> { - if ev.ts >= self.inner_a.inner_b.active_end { + let b = &self.inner_a.inner_b; + if ev.ts >= b.active_end { panic!("should never get here"); } else { trace_ingest_init_lst!("ingest_event_without_lst set lst {:?}", ev); self.lst = Some(ev.clone()); - if ev.ts >= self.inner_a.inner_b.active_beg { + if ev.ts >= b.active_beg { trace_ingest_minmax!("ingest_event_without_lst"); self.inner_a.init_minmax(&ev); - self.inner_a.inner_b.cnt += 1; - self.inner_a.inner_b.filled_until = ev.ts; + let b = &mut self.inner_a.inner_b; + b.cnt += 1; + b.filled_until = ev.ts; } Ok(()) }