diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index 46148a7..87b4d1b 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "daqbuffer" -version = "0.5.3-aa.3" +version = "0.5.3-aa.4" authors = ["Dominik Werder "] edition = "2021" [dependencies] futures-util = "0.3.29" -bytes = "1.5.0" +bytes = "1.7.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" diff --git a/crates/items_2/src/binning/aggregator.rs b/crates/items_2/src/binning/aggregator.rs index d29d5a6..c05e306 100644 --- a/crates/items_2/src/binning/aggregator.rs +++ b/crates/items_2/src/binning/aggregator.rs @@ -1,5 +1,6 @@ use super::container_events::EventValueType; use core::fmt; +use netpod::log::*; use netpod::DtNano; use serde::Deserialize; use serde::Serialize; @@ -19,7 +20,7 @@ where fn new() -> Self; fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT); fn reset_for_new_bin(&mut self); - fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg; + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg; } pub struct AggregatorNumeric { @@ -46,7 +47,7 @@ where fn ingest(&mut self, dt: DtNano, bl: DtNano, val: EVT) { let f = dt.ns() as f64 / bl.ns() as f64; - eprintln!("INGEST {} {:?}", f, val); + trace!("INGEST {} {:?}", f, val); self.sum += f * val.as_f64(); } @@ -54,11 +55,11 @@ where self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> EVT::AggTimeWeightOutputAvg { - // fn result_and_reset_for_new_bin(&mut self) -> f64 { - let ret = self.sum.clone(); + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> EVT::AggTimeWeightOutputAvg { + let sum = self.sum.clone(); + trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; - ret + sum / filled_width_fraction as f64 } } @@ -69,7 +70,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: f32) { let f = dt.ns() as f64 / bl.ns() as f64; - eprintln!("INGEST {} {}", f, val); + trace!("INGEST {} {}", f, val); self.sum += f * val as f64; } @@ -77,10 +78,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> f64 { - let ret = self.sum.clone(); + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f32 { + let sum = self.sum.clone() as f32; + trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; - ret + sum / filled_width_fraction } } @@ -91,7 +93,7 @@ impl AggregatorTimeWeight for AggregatorNumeric { fn ingest(&mut self, dt: DtNano, bl: DtNano, val: u64) { let f = dt.ns() as f64 / bl.ns() as f64; - eprintln!("INGEST {} {}", f, val); + trace!("INGEST {} {}", f, val); self.sum += f * val as f64; } @@ -99,10 +101,11 @@ impl AggregatorTimeWeight for AggregatorNumeric { self.sum = 0.; } - fn result_and_reset_for_new_bin(&mut self) -> f64 { - let ret = self.sum.clone(); + fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> f64 { + let sum = self.sum.clone(); + trace!("result_and_reset_for_new_bin sum {} {}", sum, filled_width_fraction); self.sum = 0.; - ret + sum / filled_width_fraction as f64 } } diff --git a/crates/items_2/src/binning/container_bins.rs b/crates/items_2/src/binning/container_bins.rs index 495e6f4..5e38238 100644 --- a/crates/items_2/src/binning/container_bins.rs +++ b/crates/items_2/src/binning/container_bins.rs @@ -40,6 +40,59 @@ pub struct BinSingle { pub max: EVT, pub avg: f32, pub lst: EVT, + pub fnl: bool, +} + +#[derive(Debug, Clone)] +pub struct BinRef<'a, EVT> +where + EVT: EventValueType, +{ + pub ts1: TsNano, + pub ts2: TsNano, + pub cnt: u64, + pub min: &'a EVT, + pub max: &'a EVT, + pub avg: &'a EVT::AggTimeWeightOutputAvg, + pub lst: &'a EVT, + pub fnl: bool, +} + +pub struct IterDebug<'a, EVT> +where + EVT: EventValueType, +{ + bins: &'a ContainerBins, + ix: usize, + len: usize, +} + +impl<'a, EVT> Iterator for IterDebug<'a, EVT> +where + EVT: EventValueType, +{ + type Item = BinRef<'a, EVT>; + + fn next(&mut self) -> Option { + if self.ix < self.bins.len() && self.ix < self.len { + let b = &self.bins; + let i = self.ix; + self.ix += 1; + let ret = BinRef { + ts1: b.ts1s[i], + ts2: b.ts2s[i], + cnt: b.cnts[i], + min: &b.mins[i], + max: &b.maxs[i], + avg: &b.avgs[i], + lst: &b.lsts[i], + fnl: b.fnls[i], + }; + Some(ret) + } else { + None + } + } } #[derive(Clone, Serialize, Deserialize)] @@ -54,6 +107,7 @@ where maxs: VecDeque, avgs: VecDeque, lsts: VecDeque, + fnls: VecDeque, } impl ContainerBins @@ -73,6 +127,7 @@ where maxs: VecDeque::new(), avgs: VecDeque::new(), lsts: VecDeque::new(), + fnls: VecDeque::new(), } } @@ -98,6 +153,10 @@ where self.ts2s.back().map(|&x| x) } + pub fn cnts_iter(&self) -> std::collections::vec_deque::Iter { + self.cnts.iter() + } + pub fn len_before(&self, end: TsNano) -> usize { let pp = self.ts2s.partition_point(|&x| x <= end); assert!(pp <= self.len(), "len_before pp {} len {}", pp, self.len()); @@ -127,6 +186,7 @@ where max: EVT, avg: EVT::AggTimeWeightOutputAvg, lst: EVT, + fnl: bool, ) { self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); @@ -135,6 +195,15 @@ where self.maxs.push_back(max); self.avgs.push_back(avg); self.lsts.push_back(lst); + self.fnls.push_back(fnl); + } + + pub fn iter_debug(&self) -> IterDebug { + IterDebug { + bins: self, + ix: 0, + len: self.len(), + } } } @@ -146,12 +215,13 @@ where let self_name = any::type_name::(); write!( fmt, - "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?} }}", + "{self_name} {{ len: {:?}, ts1s: {:?}, ts2s: {:?}, cnts: {:?}, avgs {:?}, fnls {:?} }}", self.len(), VecPreview::new(&self.ts1s), VecPreview::new(&self.ts2s), VecPreview::new(&self.cnts), VecPreview::new(&self.avgs), + VecPreview::new(&self.fnls), ) } } diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs index 6b1ab93..58fbadb 100644 --- a/crates/items_2/src/binning/container_events.rs +++ b/crates/items_2/src/binning/container_events.rs @@ -56,7 +56,7 @@ where impl EventValueType for f32 { type Container = VecDeque; type AggregatorTimeWeight = AggregatorNumeric; - type AggTimeWeightOutputAvg = f64; + type AggTimeWeightOutputAvg = f32; fn identity_sum() -> Self { 0. diff --git a/crates/items_2/src/binning/test/events00.rs b/crates/items_2/src/binning/test/events00.rs index 578380f..9ce8f89 100644 --- a/crates/items_2/src/binning/test/events00.rs +++ b/crates/items_2/src/binning/test/events00.rs @@ -1,17 +1,158 @@ +use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::ContainerEvents; use crate::binning::timeweight::timeweight_events::BinnedEventsTimeweight; use err::thiserror; use err::ThisError; +use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRange; use netpod::DtMs; use netpod::EnumVariant; use netpod::TsNano; +use std::collections::VecDeque; #[derive(Debug, ThisError)] #[cstm(name = "Error")] enum Error { Timeweight(#[from] crate::binning::timeweight::timeweight_events::Error), + AssertMsg(String), +} + +// fn prepare_data_with_cuts(beg_ms: u64, cuts: VecDeque) -> VecDeque> { +// let beg = TsNano::from_ms(beg_ms); +// let end = TsNano::from_ms(120); +// let mut cut_next = cuts.pop_front().unwrap_or(u64::MAX); +// let mut ret = VecDeque::new(); +// let ivl = DtMs::from_ms_u64(x) +// } + +fn pu(c: &mut ContainerEvents, ts_ms: u64, val: f32) +// where +// C: AsMut>, +// C: std::borrow::BorrowMut>, +{ + c.push_back(TsNano::from_ms(ts_ms), val); +} + +trait IntoVecDequeU64 { + fn into_vec_deque_u64(self) -> VecDeque; +} + +impl IntoVecDequeU64 for &str { + fn into_vec_deque_u64(self) -> VecDeque { + self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect() + } +} +trait IntoVecDequeF32 { + fn into_vec_deque_f32(self) -> VecDeque; +} + +impl IntoVecDequeF32 for &str { + fn into_vec_deque_f32(self) -> VecDeque { + self.split_ascii_whitespace().map(|x| x.parse().unwrap()).collect() + } +} + +fn exp_u64<'a>( + vals: impl Iterator, + exps: impl Iterator, + tag: &str, +) -> Result<(), Error> { + let mut it_a = vals; + let mut it_b = exps; + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(val), Some(exp)) = (a, b) { + if val != exp { + return Err(Error::AssertMsg(format!( + "{tag} expect close value {} vs {}", + val, exp + ))); + } + } else { + return Err(Error::AssertMsg(format!("{tag} len mismatch"))); + } + } + Ok(()) +} + +fn exp_cnts(bins: &ContainerBins, exps: impl IntoVecDequeU64) -> Result<(), Error> { + exp_u64(bins.cnts_iter(), exps.into_vec_deque_u64().iter(), "exp_cnts") +} + +fn exp_avgs(bins: &ContainerBins, exps: impl IntoVecDequeF32) -> Result<(), Error> { + let exps = exps.into_vec_deque_f32(); + let mut it_a = bins.iter_debug(); + let mut it_b = exps.iter(); + loop { + let a = it_a.next(); + let b = it_b.next(); + if a.is_none() && b.is_none() { + break; + } + if let (Some(a), Some(&exp)) = (a, b) { + let val = *a.avg as f32; + if netpod::f32_close(val, exp) == false { + return Err(Error::AssertMsg(format!("expect close value {} vs {}", val, exp))); + } + } else { + return Err(Error::AssertMsg(format!( + "len mismatch {} vs {}", + bins.len(), + exps.len() + ))); + } + } + Ok(()) +} + +#[test] +fn test_bin_events_f32_simple_with_before_00() -> Result<(), Error> { + let beg = TsNano::from_ms(110); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + evs.push_back(TsNano::from_ms(103), 2.0); + binner.ingest(evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + assert_eq!(bins.len(), 1); + let bins = binner.output(); + assert_eq!(bins.len(), 0); + Ok(()) +} + +#[test] +fn test_bin_events_f32_simple_with_before_01() -> Result<(), Error> { + let beg = TsNano::from_ms(110); + let end = TsNano::from_ms(130); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 103, 2.0); + binner.ingest(evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + assert_eq!(bins.len(), 2); + exp_cnts(&bins, "0 0")?; + exp_avgs(&bins, "2.00 2.00")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); + Ok(()) } #[test] @@ -25,8 +166,27 @@ fn test_bin_events_f32_simple_00() -> Result<(), Error> { let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); let mut binner = BinnedEventsTimeweight::new(range); let mut evs = ContainerEvents::::new(); - evs.push_back(TsNano::from_ms(103), 2.0); + let em = &mut evs; + pu(em, 100, 2.0); + pu(em, 104, 2.4); binner.ingest(evs)?; + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 111, 1.0); + pu(em, 112, 1.2); + pu(em, 113, 1.4); + binner.ingest(evs)?; + binner.input_done_range_open()?; + let bins = binner.output(); + trace!("{bins:?}"); + for b in bins.iter_debug() { + trace!("{b:?}"); + } + assert_eq!(bins.len(), 2); + exp_cnts(&bins, "2 3")?; + exp_avgs(&bins, "2.24 1.5333")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); Ok(()) } @@ -41,14 +201,62 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> { let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); let mut binner = BinnedEventsTimeweight::new(range); let mut evs = ContainerEvents::::new(); - evs.push_back(TsNano::from_ms(103), 2.0); - evs.push_back(TsNano::from_ms(104), 2.4); + let em = &mut evs; + pu(em, 102, 2.0); + pu(em, 104, 2.4); binner.ingest(evs)?; let mut evs = ContainerEvents::::new(); - evs.push_back(TsNano::from_ms(111), 1.0); - evs.push_back(TsNano::from_ms(112), 1.2); - evs.push_back(TsNano::from_ms(113), 1.4); + let em = &mut evs; + pu(em, 111, 1.0); + pu(em, 112, 1.2); + pu(em, 113, 1.4); binner.ingest(evs)?; + binner.input_done_range_open()?; + let bins = binner.output(); + trace!("{bins:?}"); + for b in bins.iter_debug() { + trace!("{b:?}"); + } + assert_eq!(bins.len(), 2); + exp_cnts(&bins, "2 3")?; + exp_avgs(&bins, "2.30 1.5333")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); + Ok(()) +} + +#[test] +fn test_bin_events_f32_small_range_final() -> Result<(), Error> { + let beg = TsNano::from_ms(100); + let end = TsNano::from_ms(120); + let nano_range = NanoRange { + beg: beg.ns(), + end: end.ns(), + }; + let range = BinnedRange::from_nano_range(nano_range, DtMs::from_ms_u64(10)); + let mut binner = BinnedEventsTimeweight::new(range); + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 102, 2.0); + pu(em, 104, 2.4); + binner.ingest(evs)?; + let mut evs = ContainerEvents::::new(); + let em = &mut evs; + pu(em, 111, 1.0); + pu(em, 112, 1.2); + pu(em, 113, 1.4); + binner.ingest(evs)?; + binner.input_done_range_final()?; + let bins = binner.output(); + trace!("{bins:?}"); + for b in bins.iter_debug() { + trace!("{b:?}"); + } + assert_eq!(bins.len(), 2); + exp_cnts(&bins, "2 3")?; + exp_avgs(&bins, "2.30 1.44")?; + let bins = binner.output(); + assert_eq!(bins.len(), 0); Ok(()) } @@ -66,8 +274,8 @@ fn test_bin_events_enum_simple_range_final() -> Result<(), Error> { evs.push_back(TsNano::from_ms(103), EnumVariant::new(1, "one")); evs.push_back(TsNano::from_ms(104), EnumVariant::new(2, "two")); binner.ingest(evs)?; - binner.range_final()?; + binner.input_done_range_final()?; let bins = binner.output(); - eprintln!("{:?}", bins); + trace!("{:?}", bins); Ok(()) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs index 9dcd7df..a24a217 100644 --- a/crates/items_2/src/binning/timeweight/timeweight_events.rs +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -19,7 +19,7 @@ use std::task::Context; use std::task::Poll; #[allow(unused)] -macro_rules! trace_ { ($($arg:tt)*) => ( if true { eprintln!($($arg)*); }) } +macro_rules! trace_ { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } @@ -48,6 +48,11 @@ macro_rules! trace_ingest_finish_bin { ($($arg:tt)*) => ( if true { trace_!($($a #[allow(unused)] macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +#[cold] +#[inline] +#[allow(unused)] +fn cold() {} + const DEBUG_CHECKS: bool = true; #[derive(Debug, ThisError)] @@ -67,6 +72,7 @@ pub enum Error { type MinMax = (EventSingle, EventSingle); +#[derive(Clone)] struct LstRef<'a, EVT>(&'a EventSingle); struct LstMut<'a, EVT>(&'a mut EventSingle); @@ -80,6 +86,7 @@ where active_end: TsNano, active_len: DtNano, filled_until: TsNano, + filled_width: DtNano, agg: ::AggregatorTimeWeight, } @@ -99,11 +106,13 @@ where } } let dt = ev.ts.delta(self.filled_until); + trace_ingest_event!("ingest_event_with_lst_gt_range_beg_agg dt {:?} ev {:?}", dt, ev); // TODO can the caller already take the value and replace it afterwards with the current value? // This fn could swap the value in lst and directly use it. // This would require that any call path does not mess with lst. // NOTE that this fn is also used during bin-cycle. self.agg.ingest(dt, self.active_len, lst.0.val.clone()); + self.filled_width = self.filled_width.add(dt); self.filled_until = ev.ts; } @@ -150,7 +159,7 @@ where ) -> Result<(), Error> { trace_ingest_event!("ingest_with_lst_gt_range_beg"); while let Some(ev) = evs.pop_front() { - trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_gt_range_beg"); if ev.ts <= self.active_beg { panic!("should never get here"); } @@ -171,7 +180,7 @@ where ) -> Result<(), Error> { trace_ingest_event!("ingest_with_lst_ge_range_beg"); while let Some(ev) = evs.pop_front() { - trace_event_next!("ingest_with_lst_ge_range_beg {:?}", ev); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst_ge_range_beg"); if ev.ts < self.active_beg { panic!("should never get here"); } @@ -200,6 +209,8 @@ where trace_ingest_event!("ingest_with_lst_minmax"); // TODO how to handle the min max? I don't take event data yet out of the container. if let Some(ts0) = evs.ts_first() { + trace_ingest_event!("EVENT POP FRONT ingest_with_lst_minmax"); + trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest_with_lst_minmax", ts0); if ts0 < self.active_beg { panic!("should never get here"); } else { @@ -212,21 +223,17 @@ where // PRECONDITION: filled_until < ts <= active_end fn fill_until(&mut self, ts: TsNano, lst: LstRef) { - trace_cycle!("fill_until ts {:?}", ts); let b = self; assert!(b.filled_until < ts); assert!(ts <= b.active_end); - b.agg.ingest(ts.delta(b.filled_until), b.active_len, lst.0.val.clone()); + let dt = ts.delta(b.filled_until); + trace_cycle!("fill_until ts {:?} dt {:?} lst {:?}", ts, dt, lst.0); + assert!(b.filled_until < ts); + assert!(ts <= b.active_end); + b.agg.ingest(dt, b.active_len, lst.0.val.clone()); + b.filled_width = b.filled_width.add(dt); b.filled_until = ts; } - - fn fill_remaining_if_space_left(&mut self, lst: LstRef) { - trace_cycle!("fill_remaining_if_space_left"); - let b = self; - if b.filled_until < b.active_end { - b.fill_until(b.active_end, lst); - } - } } struct InnerA @@ -270,7 +277,7 @@ where self.inner_b.ingest_with_lst_minmax(evs, lst, minmax) } else { if let Some(ev) = evs.pop_front() { - trace_event_next!("ingest_with_lst {:?}", ev); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_with_lst"); let beg = self.inner_b.active_beg; let end = self.inner_b.active_end; if ev.ts < beg { @@ -301,6 +308,53 @@ where } } } + + fn reset_01(&mut self, lst: LstRef) { + let selfname = "reset_01"; + let b = &mut self.inner_b; + trace_cycle!( + "{selfname} active_end {:?} filled_until {:?}", + b.active_end, + b.filled_until + ); + let div = b.active_len.ns(); + let old_end = b.active_end; + let ts1 = TsNano::from_ns(b.active_end.ns() / div * div); + assert!(ts1 == old_end); + b.active_beg = ts1; + b.active_end = ts1.add_dt_nano(b.active_len); + b.filled_until = ts1; + b.filled_width = DtNano::from_ns(0); + b.cnt = 0; + self.minmax = Some((lst.0.clone(), lst.0.clone())); + } + + fn push_out_and_reset(&mut self, lst: LstRef, range_final: bool, out: &mut ContainerBins) { + let selfname = "push_out_and_reset"; + // TODO there is not always good enough input to produce a meaningful bin. + // TODO can we always reset, and what exactly does reset mean here? + // TODO what logic can I save here? To output a bin I need to have min, max, lst. + let b = &mut self.inner_b; + let minmax = self.minmax.get_or_insert_with(|| { + trace_cycle!("{selfname} minmax not yet set"); + (lst.0.clone(), lst.0.clone()) + }); + { + let filled_width_fraction = b.filled_width.fraction_of(b.active_len); + let res = b.agg.result_and_reset_for_new_bin(filled_width_fraction); + out.push_back( + b.active_beg, + b.active_end, + b.cnt, + minmax.0.val.clone(), + minmax.1.val.clone(), + res, + lst.0.val.clone(), + range_final, + ); + } + self.reset_01(lst); + } } pub struct BinnedEventsTimeweight @@ -330,6 +384,7 @@ where active_end, active_len, filled_until: active_beg, + filled_width: DtNano::from_ns(0), agg: <::AggregatorTimeWeight as AggregatorTimeWeight>::new(), }, minmax: None, @@ -349,6 +404,7 @@ where trace_ingest_minmax!("ingest_event_without_lst"); self.inner_a.init_minmax(&ev); self.inner_a.inner_b.cnt += 1; + self.inner_a.inner_b.filled_until = ev.ts; } Ok(()) } @@ -356,7 +412,7 @@ where fn ingest_without_lst(&mut self, mut evs: ContainerEventsTakeUpTo) -> Result<(), Error> { if let Some(ev) = evs.pop_front() { - trace_event_next!("ingest_without_lst {:?}", ev); + trace_event_next!("EVENT POP FRONT {:?} {:30}", ev, "ingest_without_lst"); if ev.ts >= self.inner_a.inner_b.active_end { panic!("should never get here"); } else { @@ -390,52 +446,38 @@ where let b = &self.inner_a.inner_b; trace_cycle!("cycle_01 {:?} {:?}", ts, b.active_end); assert!(b.active_beg < ts); - let div = self.range.bin_len.ns(); + assert!(b.active_beg <= b.filled_until); + assert!(b.filled_until < ts); + assert!(b.filled_until <= b.active_end); + let div = b.active_len.ns(); if let Some(lst) = self.lst.as_ref() { + let lst = LstRef(lst); + let mut i = 0; loop { + i += 1; + assert!(i < 100000, "too many iterations"); let b = &self.inner_a.inner_b; - if b.filled_until >= ts { - break; - } - if ts >= b.active_end { - self.inner_a.inner_b.fill_remaining_if_space_left(LstRef(lst)); - let b = &mut self.inner_a.inner_b; - let minmax = self.inner_a.minmax.get_or_insert_with(|| { - trace_cycle!("cycle_01 minmax not yet set"); - (lst.clone(), lst.clone()) - }); - { - // TODO push bin to output. - let res = b.agg.result_and_reset_for_new_bin(); - self.out.push_back( - b.active_beg, - b.active_end, - b.cnt, - minmax.0.val.clone(), - minmax.1.val.clone(), - res, - lst.val.clone(), - ); + if ts > b.filled_until { + if ts >= b.active_end { + if b.filled_until < b.active_end { + self.inner_a.inner_b.fill_until(b.active_end, lst.clone()); + } + self.inner_a.push_out_and_reset(lst.clone(), true, &mut self.out); + } else { + self.inner_a.inner_b.fill_until(ts, lst.clone()); } - trace_cycle!("cycle_01 filled up to {:?} emit and reset", b.active_end); - let old_end = b.active_end; - let ts1 = TsNano::from_ns(b.active_end.ns() / div * div); - assert!(ts1 == old_end); - b.active_beg = ts1; - b.active_end = ts1.add_dt_nano(b.active_len); - b.filled_until = ts1; - b.cnt = 0; - self.inner_a.minmax = Some((lst.clone(), lst.clone())); } else { - self.inner_a.inner_b.fill_until(ts, LstRef(lst)); + break; } } } else { + // TODO merge with the other reset let ts1 = TsNano::from_ns(ts.ns() / div * div); let b = &mut self.inner_a.inner_b; b.active_beg = ts1; b.active_end = ts1.add_dt_nano(b.active_len); b.filled_until = ts1; + b.filled_width = DtNano::from_ns(0); b.cnt = 0; b.agg.reset_for_new_bin(); assert!(self.inner_a.minmax.is_none()); @@ -443,6 +485,17 @@ where } } + fn cycle_02(&mut self) { + let b = &self.inner_a.inner_b; + trace_cycle!("cycle_02 {:?}", b.active_end); + if let Some(lst) = self.lst.as_ref() { + let lst = LstRef(lst); + self.inner_a.push_out_and_reset(lst, false, &mut self.out); + } else { + // there is nothing we can produce + } + } + pub fn ingest(&mut self, mut evs_all: ContainerEvents) -> Result<(), Error> { // It is this type's task to find and store the one-before event. // We then pass it to the aggregation. @@ -457,16 +510,15 @@ where evs_all.verify()?; loop { - // How to handle transition to the next bin? - // How to handle to not emit bins until at least some partially filled bin is encountered? break if let Some(ts) = evs_all.ts_first() { + trace_ingest_event!("EVENT TIMESTAMP FRONT {:?} ingest", ts); let b = &mut self.inner_a.inner_b; if ts >= self.range.nano_end() { return Err(Error::EventAfterRange); } if ts >= b.active_end { trace_cycle!("bin edge boundary {:?}", b.active_end); - assert!(b.filled_until < b.active_beg); + assert!(b.filled_until < b.active_end, "{} < {}", b.filled_until, b.active_end); self.cycle_01(ts); } let n1 = evs_all.len(); @@ -496,12 +548,18 @@ where Ok(()) } - pub fn range_final(&mut self) -> Result<(), Error> { - trace_cycle!("range_final"); + pub fn input_done_range_final(&mut self) -> Result<(), Error> { + trace_cycle!("input_done_range_final"); self.cycle_01(self.range.nano_end()); Ok(()) } + pub fn input_done_range_open(&mut self) -> Result<(), Error> { + trace_cycle!("input_done_range_open"); + self.cycle_02(); + Ok(()) + } + pub fn output(&mut self) -> ContainerBins { ::core::mem::replace(&mut self.out, ContainerBins::new()) } diff --git a/crates/items_2/src/binning/valuetype.rs b/crates/items_2/src/binning/valuetype.rs index f6f25cb..de626a1 100644 --- a/crates/items_2/src/binning/valuetype.rs +++ b/crates/items_2/src/binning/valuetype.rs @@ -68,10 +68,13 @@ impl AggregatorTimeWeight for EnumVariantAggregatorTimeWeight { self.sum = f32::identity_sum(); } - fn result_and_reset_for_new_bin(&mut self) -> ::AggTimeWeightOutputAvg { + fn result_and_reset_for_new_bin( + &mut self, + filled_width_fraction: f32, + ) -> ::AggTimeWeightOutputAvg { let ret = self.sum.clone(); self.sum = f32::identity_sum(); - ret + ret / filled_width_fraction } } diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index 3dd123f..6df1394 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -50,12 +50,12 @@ pub mod log_macros { } } -pub mod log { +pub mod log2 { pub use tracing::{self, event, span, Level}; pub use tracing::{debug, error, info, trace, warn}; } -pub mod log2 { +pub mod log { pub use crate::{debug, error, info, trace, warn}; pub use tracing::{self, event, span, Level}; } @@ -1698,6 +1698,14 @@ impl DtNano { pub fn to_i64(&self) -> i64 { self.0 as i64 } + + pub fn add(self, rhs: Self) -> Self { + Self(self.0 + rhs.0) + } + + pub fn fraction_of(self, rhs: Self) -> f32 { + self.0 as f32 / rhs.0 as f32 + } } impl fmt::Display for DtNano {