diff --git a/src/binning/container/bins.rs b/src/binning/container/bins.rs index cd309fb..700bf42 100644 --- a/src/binning/container/bins.rs +++ b/src/binning/container/bins.rs @@ -1,19 +1,23 @@ use crate::binning::container_events::PartialOrdEvtA; use items_0::vecpreview::PreviewRange; use netpod::DtNano; +use netpod::log; use serde::Deserialize; use serde::Serialize; use std::collections::VecDeque; use std::fmt; use std::ops::Range; +macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } +macro_rules! trace_result { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } + pub trait AggBinValTw: fmt::Debug + Send where BVT: BinAggedType, { - fn new() -> Self; - fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: BVT); - fn result(&mut self, filled_width_fraction: f32) -> BVT; + fn new(binlen: DtNano) -> Self; + fn ingest(&mut self, dt: DtNano, val: BVT); + fn result(&self) -> BVT; fn reset_for_new_bin(&mut self); } @@ -88,52 +92,94 @@ impl BinAggedType for f64 { #[derive(Debug)] pub struct AggBinValTwF32 { + binlen: DtNano, + filled: DtNano, sum: f32, } impl AggBinValTw for AggBinValTwF32 { - fn new() -> Self { - Self { sum: 0. } + fn new(binlen: DtNano) -> Self { + Self { + binlen, + filled: DtNano::from_ns(0), + sum: 0., + } } - fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f32) { - let f = dt.ns() as f32 / bl.ns() as f32; + fn ingest(&mut self, dt: DtNano, val: f32) { + type FT = f32; + let f = dt.ns() as FT / self.binlen.ns() as FT; + trace_ingest!( + "ingest dt {} s val {:.4} f {:.4}", + dt.ms_u64() / 1000, + val, + f + ); + self.filled = self.filled.add(dt); self.sum += f * val; } - fn result(&mut self, filled_width_fraction: f32) -> f32 { - let ret = self.sum.clone() / filled_width_fraction; - >::reset_for_new_bin(self); + fn result(&self) -> f32 { + type FT = f32; + let ret = if self.filled.ns() == 0 { + return FT::NAN; + } else { + let f = self.binlen.ms_u64() as FT / self.filled.ms_u64() as FT; + trace_result!("result sum {:.4} f {:.4}", self.sum, f); + self.sum.clone() * f + }; ret } fn reset_for_new_bin(&mut self) { + self.filled = DtNano::from_ns(0); self.sum = 0.; } } #[derive(Debug)] pub struct AggBinValTwF64 { + binlen: DtNano, + filled: DtNano, sum: f64, } impl AggBinValTw for AggBinValTwF64 { - fn new() -> Self { - Self { sum: 0. } + fn new(binlen: DtNano) -> Self { + Self { + binlen, + filled: DtNano::from_ns(0), + sum: 0., + } } - fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f64) { - let f = dt.ns() as f32 / bl.ns() as f32; - self.sum += f as f64 * val; + fn ingest(&mut self, dt: DtNano, val: f64) { + type FT = f64; + let f = dt.ns() as FT / self.binlen.ns() as FT; + trace_ingest!( + "ingest dt {} s val {:.4} f {:.4}", + dt.ms_u64() / 1000, + val, + f + ); + self.filled = self.filled.add(dt); + self.sum += f * val; } - fn result(&mut self, filled_width_fraction: f32) -> f64 { - let ret = self.sum.clone() / filled_width_fraction as f64; - >::reset_for_new_bin(self); + fn result(&self) -> f64 { + type FT = f64; + let ret = if self.filled.ns() == 0 { + return FT::NAN; + } else { + let f = self.binlen.ms_u64() as FT / self.filled.ms_u64() as FT; + trace_result!("result sum {:.4} f {:.4}", self.sum, f); + self.sum.clone() * f + }; ret } fn reset_for_new_bin(&mut self) { + self.filled = DtNano::from_ns(0); self.sum = 0.; } } diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index f3657d8..6fa406a 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -8,7 +8,9 @@ use items_0::timebin::BinnedBinsTimeweightTrait; use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; use netpod::BinnedRange; +use netpod::DtMs; use netpod::TsNano; +use netpod::range::evrange::NanoRange; use serde::Serialize; use std::any; @@ -21,6 +23,7 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!("BIN EMIT { autoerr::create_error_v1!( name(Error, "BinBinsTimeweight"), enum variants { + InputBinlenOverflow, Logic, }, ); @@ -54,6 +57,7 @@ where { pub fn new(range: BinnedRange) -> Self { trace_init!("BinnedBinsTimeweight::new {}", range); + let binlen = range.bin_len_dt_ns(); let active_beg = range.nano_beg(); let active_end = active_beg.add_dt_nano(range.bin_len_dt_ns()); Self { @@ -64,8 +68,8 @@ where min: None, max: None, lst: None, - fraction_filled: 0., - agg: BVT::AggregatorTw::new(), + fraction_filled: 1., + agg: BVT::AggregatorTw::new(binlen), non_fnl: false, out: ContainerBins::new(), produce_cnt_zero: false, @@ -92,7 +96,7 @@ where let cnt = self.cnt; let min = self.min.as_ref().unwrap().clone(); let max = self.max.as_ref().unwrap().clone(); - let agg = self.agg.result(self.fraction_filled); + let agg = self.agg.result(); let lst = self.lst.as_ref().unwrap().clone(); let fnl = self.non_fnl == false; trace_emit!( @@ -165,23 +169,29 @@ where self.active_beg ); for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() { - let grid = self.range.bin_len_dt_ns(); + let binlen = self.range.bin_len_dt_ns(); trace_ingest_bin!( - "ingest_bins + + + + grid {:?} ts1 {:?} agg {:?}", - grid, + "ingest_bins + + + + binlen {:?} s ts1 {:?} agg {:?}", + binlen.ms_u64() / 1000, ts1, agg ); if ts1 < self.active_beg { + trace_ingest_bin!("before active-beg: just set lst"); self.lst = Some(lst.into()); } else { if ts1 >= self.active_end { - trace_ingest_bin!("{}", "ingest loop finish current bin"); + trace_ingest_bin!("{}", "ingest loop finish current bin A"); self.maybe_emit_active(); self.active_forward(ts1); } + if ts2 > self.active_end { + trace_ingest_bin!("{}", "ingest loop finish current bin B"); + self.maybe_emit_active(); + self.active_forward(ts2); + } if ts1 == self.active_beg { - trace_ingest_bin!("{}", "HARD SET BOTH MINMAX"); + trace_ingest_bin!("{}", "set minmax"); self.min = Some(min.clone().into()); self.max = Some(max.clone().into()); } @@ -189,12 +199,15 @@ where Self::bound(&mut self.min, min, std::cmp::Ordering::Less); Self::bound(&mut self.max, max, std::cmp::Ordering::Greater); let dt = ts2.delta(ts1); - let bl = self.range.bin_len_dt_ns(); - self.agg.ingest(dt, bl, cnt, agg.into()); + if dt > binlen { + return Err(BinningggError::Dyn(Box::new(Error::InputBinlenOverflow))); + } + trace_ingest_bin!("dt {} s", dt.ms_u64() / 1000); + self.agg.ingest(dt, agg.into()); self.non_fnl |= !fnl; self.lst = Some(lst.into()); if ts2 >= self.active_end { - trace_ingest_bin!("{}", "ingest loop finish current bin"); + trace_ingest_bin!("{}", "ingest loop finish current bin C"); self.maybe_emit_active(); self.active_forward(ts2); } @@ -246,3 +259,41 @@ where } } } + +#[test] +fn test_input_not_covering_first_bin() { + let range = NanoRange::from_strings("1970-01-01T00:10:00Z", "1970-01-01T00:20:00Z").unwrap(); + let binlen = DtMs::from_ms_u64(1000 * 10); + let range = BinnedRange::from_nano_range(range, binlen); + let mut inp = ContainerBins::new(); + let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 0); + let ts2 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 9); + inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true); + let mut binner = BinnedBinsTimeweight::::new(range); + binner.ingest_bins(&inp).unwrap(); + assert!(binner.output().unwrap().is_none()); +} + +#[test] +fn test_00() { + let range = NanoRange::from_strings("1970-01-01T00:10:00Z", "1970-01-01T00:20:00Z").unwrap(); + let binlen = DtMs::from_ms_u64(1000 * 10); + let range = BinnedRange::from_nano_range(range, binlen); + let mut inp = ContainerBins::new(); + let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 0); + let ts2 = ts1.add_dt_nano(binlen.dt_ns()); + inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true); + // let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 10); + // let ts2 = ts1.add_dt_nano(binlen.dt_ns()); + // inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true); + let mut binner = BinnedBinsTimeweight::::new(range); + binner.ingest_bins(&inp).unwrap(); + let out = binner.output().unwrap().unwrap(); + if let Some(bins) = out.as_any_ref().downcast_ref::>() { + for x in bins.zip_iter_2() { + eprintln!("{x:?}"); + } + } else { + panic!() + } +} diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index 506b111..f0595a5 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -407,7 +407,7 @@ where (lst.0.clone(), lst.0.clone()) }); { - let filled_width_fraction = b.filled_width.fraction_of(b.active_len); + let filled_width_fraction = b.filled_width.fraction_f32_of(b.active_len); let res = b.agg.result_and_reset_for_new_bin(filled_width_fraction); trace_ingest_minmax!( "{} push out min {:?} max {:?}",