Add test for binning

This commit is contained in:
Dominik Werder
2025-06-16 17:36:20 +02:00
parent b23f32f1f3
commit caf0b83f5b
3 changed files with 127 additions and 30 deletions

View File

@@ -1,19 +1,23 @@
use crate::binning::container_events::PartialOrdEvtA;
use items_0::vecpreview::PreviewRange;
use netpod::DtNano;
use netpod::log;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
use std::fmt;
use std::ops::Range;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
macro_rules! trace_result { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
pub trait AggBinValTw<BVT>: fmt::Debug + Send
where
BVT: BinAggedType,
{
fn new() -> Self;
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: BVT);
fn result(&mut self, filled_width_fraction: f32) -> BVT;
fn new(binlen: DtNano) -> Self;
fn ingest(&mut self, dt: DtNano, val: BVT);
fn result(&self) -> BVT;
fn reset_for_new_bin(&mut self);
}
@@ -88,52 +92,94 @@ impl BinAggedType for f64 {
#[derive(Debug)]
pub struct AggBinValTwF32 {
binlen: DtNano,
filled: DtNano,
sum: f32,
}
impl AggBinValTw<f32> for AggBinValTwF32 {
fn new() -> Self {
Self { sum: 0. }
fn new(binlen: DtNano) -> Self {
Self {
binlen,
filled: DtNano::from_ns(0),
sum: 0.,
}
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f32) {
let f = dt.ns() as f32 / bl.ns() as f32;
fn ingest(&mut self, dt: DtNano, val: f32) {
type FT = f32;
let f = dt.ns() as FT / self.binlen.ns() as FT;
trace_ingest!(
"ingest dt {} s val {:.4} f {:.4}",
dt.ms_u64() / 1000,
val,
f
);
self.filled = self.filled.add(dt);
self.sum += f * val;
}
fn result(&mut self, filled_width_fraction: f32) -> f32 {
let ret = self.sum.clone() / filled_width_fraction;
<Self as AggBinValTw<f32>>::reset_for_new_bin(self);
fn result(&self) -> f32 {
type FT = f32;
let ret = if self.filled.ns() == 0 {
return FT::NAN;
} else {
let f = self.binlen.ms_u64() as FT / self.filled.ms_u64() as FT;
trace_result!("result sum {:.4} f {:.4}", self.sum, f);
self.sum.clone() * f
};
ret
}
fn reset_for_new_bin(&mut self) {
self.filled = DtNano::from_ns(0);
self.sum = 0.;
}
}
#[derive(Debug)]
pub struct AggBinValTwF64 {
binlen: DtNano,
filled: DtNano,
sum: f64,
}
impl AggBinValTw<f64> for AggBinValTwF64 {
fn new() -> Self {
Self { sum: 0. }
fn new(binlen: DtNano) -> Self {
Self {
binlen,
filled: DtNano::from_ns(0),
sum: 0.,
}
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, _cnt: u64, val: f64) {
let f = dt.ns() as f32 / bl.ns() as f32;
self.sum += f as f64 * val;
fn ingest(&mut self, dt: DtNano, val: f64) {
type FT = f64;
let f = dt.ns() as FT / self.binlen.ns() as FT;
trace_ingest!(
"ingest dt {} s val {:.4} f {:.4}",
dt.ms_u64() / 1000,
val,
f
);
self.filled = self.filled.add(dt);
self.sum += f * val;
}
fn result(&mut self, filled_width_fraction: f32) -> f64 {
let ret = self.sum.clone() / filled_width_fraction as f64;
<Self as AggBinValTw<f64>>::reset_for_new_bin(self);
fn result(&self) -> f64 {
type FT = f64;
let ret = if self.filled.ns() == 0 {
return FT::NAN;
} else {
let f = self.binlen.ms_u64() as FT / self.filled.ms_u64() as FT;
trace_result!("result sum {:.4} f {:.4}", self.sum, f);
self.sum.clone() * f
};
ret
}
fn reset_for_new_bin(&mut self) {
self.filled = DtNano::from_ns(0);
self.sum = 0.;
}
}

View File

@@ -8,7 +8,9 @@ use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
use netpod::range::evrange::NanoRange;
use serde::Serialize;
use std::any;
@@ -21,6 +23,7 @@ macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!("BIN EMIT {
autoerr::create_error_v1!(
name(Error, "BinBinsTimeweight"),
enum variants {
InputBinlenOverflow,
Logic,
},
);
@@ -54,6 +57,7 @@ where
{
pub fn new(range: BinnedRange<TsNano>) -> Self {
trace_init!("BinnedBinsTimeweight::new {}", range);
let binlen = range.bin_len_dt_ns();
let active_beg = range.nano_beg();
let active_end = active_beg.add_dt_nano(range.bin_len_dt_ns());
Self {
@@ -64,8 +68,8 @@ where
min: None,
max: None,
lst: None,
fraction_filled: 0.,
agg: BVT::AggregatorTw::new(),
fraction_filled: 1.,
agg: BVT::AggregatorTw::new(binlen),
non_fnl: false,
out: ContainerBins::new(),
produce_cnt_zero: false,
@@ -92,7 +96,7 @@ where
let cnt = self.cnt;
let min = self.min.as_ref().unwrap().clone();
let max = self.max.as_ref().unwrap().clone();
let agg = self.agg.result(self.fraction_filled);
let agg = self.agg.result();
let lst = self.lst.as_ref().unwrap().clone();
let fnl = self.non_fnl == false;
trace_emit!(
@@ -165,23 +169,29 @@ where
self.active_beg
);
for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() {
let grid = self.range.bin_len_dt_ns();
let binlen = self.range.bin_len_dt_ns();
trace_ingest_bin!(
"ingest_bins + + + + grid {:?} ts1 {:?} agg {:?}",
grid,
"ingest_bins + + + + binlen {:?} s ts1 {:?} agg {:?}",
binlen.ms_u64() / 1000,
ts1,
agg
);
if ts1 < self.active_beg {
trace_ingest_bin!("before active-beg: just set lst");
self.lst = Some(lst.into());
} else {
if ts1 >= self.active_end {
trace_ingest_bin!("{}", "ingest loop finish current bin");
trace_ingest_bin!("{}", "ingest loop finish current bin A");
self.maybe_emit_active();
self.active_forward(ts1);
}
if ts2 > self.active_end {
trace_ingest_bin!("{}", "ingest loop finish current bin B");
self.maybe_emit_active();
self.active_forward(ts2);
}
if ts1 == self.active_beg {
trace_ingest_bin!("{}", "HARD SET BOTH MINMAX");
trace_ingest_bin!("{}", "set minmax");
self.min = Some(min.clone().into());
self.max = Some(max.clone().into());
}
@@ -189,12 +199,15 @@ where
Self::bound(&mut self.min, min, std::cmp::Ordering::Less);
Self::bound(&mut self.max, max, std::cmp::Ordering::Greater);
let dt = ts2.delta(ts1);
let bl = self.range.bin_len_dt_ns();
self.agg.ingest(dt, bl, cnt, agg.into());
if dt > binlen {
return Err(BinningggError::Dyn(Box::new(Error::InputBinlenOverflow)));
}
trace_ingest_bin!("dt {} s", dt.ms_u64() / 1000);
self.agg.ingest(dt, agg.into());
self.non_fnl |= !fnl;
self.lst = Some(lst.into());
if ts2 >= self.active_end {
trace_ingest_bin!("{}", "ingest loop finish current bin");
trace_ingest_bin!("{}", "ingest loop finish current bin C");
self.maybe_emit_active();
self.active_forward(ts2);
}
@@ -246,3 +259,41 @@ where
}
}
}
#[test]
fn test_input_not_covering_first_bin() {
let range = NanoRange::from_strings("1970-01-01T00:10:00Z", "1970-01-01T00:20:00Z").unwrap();
let binlen = DtMs::from_ms_u64(1000 * 10);
let range = BinnedRange::from_nano_range(range, binlen);
let mut inp = ContainerBins::new();
let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 0);
let ts2 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 9);
inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true);
let mut binner = BinnedBinsTimeweight::<f32, f32>::new(range);
binner.ingest_bins(&inp).unwrap();
assert!(binner.output().unwrap().is_none());
}
#[test]
fn test_00() {
let range = NanoRange::from_strings("1970-01-01T00:10:00Z", "1970-01-01T00:20:00Z").unwrap();
let binlen = DtMs::from_ms_u64(1000 * 10);
let range = BinnedRange::from_nano_range(range, binlen);
let mut inp = ContainerBins::new();
let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 0);
let ts2 = ts1.add_dt_nano(binlen.dt_ns());
inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true);
// let ts1 = TsNano::from_ms(1000 * 60 * 10 + 1000 * 10);
// let ts2 = ts1.add_dt_nano(binlen.dt_ns());
// inp.push_back(ts1, ts2, 1, 1.8, 2.2, 2.0, 1.9, true);
let mut binner = BinnedBinsTimeweight::<f32, f32>::new(range);
binner.ingest_bins(&inp).unwrap();
let out = binner.output().unwrap().unwrap();
if let Some(bins) = out.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
for x in bins.zip_iter_2() {
eprintln!("{x:?}");
}
} else {
panic!()
}
}

View File

@@ -407,7 +407,7 @@ where
(lst.0.clone(), lst.0.clone())
});
{
let filled_width_fraction = b.filled_width.fraction_of(b.active_len);
let filled_width_fraction = b.filled_width.fraction_f32_of(b.active_len);
let res = b.agg.result_and_reset_for_new_bin(filled_width_fraction);
trace_ingest_minmax!(
"{} push out min {:?} max {:?}",