Attempt better binned bins

This commit is contained in:
Dominik Werder
2024-11-20 16:12:28 +01:00
parent 5de8f847a4
commit e17bb885fc
7 changed files with 184 additions and 59 deletions

View File

@@ -61,7 +61,7 @@ impl AggWithF64 for f64 {
impl<EVT> AggregatorTimeWeight<EVT> for AggregatorNumeric
where
EVT: AggWithF64,
EVT: EventValueType + AggWithF64,
{
fn new() -> Self {
Self { sum: 0. }
@@ -87,7 +87,7 @@ where
sum,
filled_width_fraction
);
self.sum = 0.;
<Self as AggregatorTimeWeight<EVT>>::reset_for_new_bin(self);
sum / filled_width_fraction as f64
}
}

View File

@@ -1,5 +1,3 @@
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use items_0::vecpreview::PreviewRange;
use netpod::DtNano;
@@ -13,9 +11,9 @@ where
BVT: BinAggedType,
{
fn new() -> Self;
fn ingest(&mut self, bl: DtNano, val: BVT);
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: BVT);
fn result(&mut self, filled_width_fraction: f32) -> BVT;
fn reset_for_new_bin(&mut self);
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> BVT;
}
pub trait BinAggedContainer<BVT>:
@@ -33,20 +31,10 @@ pub trait BinAggedType:
fmt::Debug + Clone + PartialOrd + Send + 'static + Serialize + for<'a> Deserialize<'a>
{
type Container: BinAggedContainer<Self>;
type AggregatorTimeWeight: AggBinValTw<Self>;
type AggregatorTw: AggBinValTw<Self>;
type IterTy1<'a>: fmt::Debug + Clone + PartialOrdEvtA<Self> + Into<Self>;
}
impl<EVT, BVT> PreviewRange for ContainerBins<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn preview<'a>(&'a self) -> Box<dyn fmt::Debug + 'a> {
todo!()
}
}
impl<BVT> BinAggedContainer<BVT> for VecDeque<f32>
where
BVT: BinAggedType,
@@ -91,35 +79,64 @@ where
impl BinAggedType for f32 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = ();
type AggregatorTw = AggBinValTwF32;
type IterTy1<'a> = Self;
}
impl BinAggedType for f64 {
type Container = VecDeque<Self>;
type AggregatorTimeWeight = ();
type AggregatorTw = AggBinValTwF64;
type IterTy1<'a> = Self;
}
impl<T> AggBinValTw<T> for ()
where
T: BinAggedType,
{
#[derive(Debug)]
pub struct AggBinValTwF32 {
sum: f32,
}
impl AggBinValTw<f32> for AggBinValTwF32 {
fn new() -> Self {
todo!()
Self { sum: 0. }
}
fn ingest(&mut self, bl: DtNano, val: T) {
todo!()
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f32) {
let f = dt.ns() as f32 / bl.ns() as f32;
self.sum += f * val;
}
fn result(&mut self, filled_width_fraction: f32) -> f32 {
let ret = self.sum.clone() / filled_width_fraction;
<Self as AggBinValTw<f32>>::reset_for_new_bin(self);
ret
}
fn reset_for_new_bin(&mut self) {
todo!()
}
fn result_and_reset_for_new_bin(&mut self, filled_width_fraction: f32) -> T {
todo!()
self.sum = 0.;
}
}
pub struct DummyPayload {}
#[derive(Debug)]
pub struct AggBinValTwF64 {
sum: f64,
}
impl AggBinValTw<f64> for AggBinValTwF64 {
fn new() -> Self {
Self { sum: 0. }
}
fn ingest(&mut self, dt: DtNano, bl: DtNano, cnt: u64, val: f64) {
let f = dt.ns() as f32 / bl.ns() as f32;
self.sum += f as f64 * val;
}
fn result(&mut self, filled_width_fraction: f32) -> f64 {
let ret = self.sum.clone() / filled_width_fraction as f64;
<Self as AggBinValTw<f64>>::reset_for_new_bin(self);
ret
}
fn reset_for_new_bin(&mut self) {
self.sum = 0.;
}
}

View File

@@ -621,10 +621,6 @@ where
EVT: EventValueType,
BVT: BinAggedType,
{
fn type_name(&self) -> &'static str {
any::type_name::<Self>()
}
fn empty(&self) -> BinsBoxed {
Box::new(Self::new())
}
@@ -668,6 +664,7 @@ where
range: netpod::BinnedRange<TsNano>,
) -> Box<dyn items_0::timebin::BinnedBinsTimeweightTrait> {
let ret = super::timeweight::timeweight_bins::BinnedBinsTimeweight::<
EVT,
EVT::AggTimeWeightOutputAvg,
>::new(range);
Box::new(ret)

View File

@@ -29,7 +29,7 @@ fn test_bin_events_f32_simple_01() -> Result<(), Error> {
let fut = async {
let beg = TsNano::from_ms(100);
let end = TsNano::from_ms(500);
let bin_len = DtMs::from_ms_u64(10);
let bin_len = DtMs::from_ms_u64(100);
let nano_range = NanoRange {
beg: beg.ns(),
end: end.ns(),

View File

@@ -5,6 +5,8 @@ use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
use items_0::timebin::BinningggContainerBinsDyn;
use netpod::DtNano;
use netpod::TsNano;
use std::pin::Pin;
pub(super) fn boxed_conts<S>(inp: S) -> Pin<Box<dyn Stream<Item = <S as Stream>::Item> + Send>>
@@ -16,9 +18,19 @@ where
pub(super) fn bins_gen_dim0_f32_v00(
) -> impl Stream<Item = Sitemty<Box<dyn BinningggContainerBinsDyn>>> {
futures_util::stream::iter((0usize..1000).into_iter())
futures_util::stream::iter((9u64..100).into_iter())
.map(|x| {
let c = ContainerBins::<f32, f64>::new();
let mut c = ContainerBins::<f32, f32>::new();
let bl = DtNano::from_ms(10);
let ts1 = TsNano::from_ms(bl.ms_u64() * x);
let ts2 = ts1.add_dt_nano(bl);
let cnt = 8;
let min = 2.;
let max = 4.;
let agg = 2.2;
let lst = 2.4;
let fnl = true;
c.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl);
Box::new(c) as Box<dyn BinningggContainerBinsDyn>
})
.map(|x| Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))))

View File

@@ -1,3 +1,4 @@
use crate::binning::container::bins::AggBinValTw;
use crate::binning::container::bins::BinAggedType;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
@@ -7,59 +8,154 @@ use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
use netpod::BinnedRange;
use netpod::TsNano;
use std::marker::PhantomData;
use std::any;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[derive(Debug, thiserror::Error)]
#[cstm(name = "BinBinsTimeweight")]
pub enum Error {}
#[derive(Debug)]
pub struct BinnedBinsTimeweight<BVT>
pub struct BinnedBinsTimeweight<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
range: BinnedRange<TsNano>,
// out: ContainerBins<BVT>,
produce_cnt_zero: bool,
// agg: <BVT as BinValueType>,
t1: PhantomData<BVT>,
active_beg: TsNano,
active_end: TsNano,
cnt: u64,
min: Option<EVT>,
max: Option<EVT>,
lst: Option<EVT>,
agg: <BVT as BinAggedType>::AggregatorTw,
non_fnl: bool,
out: ContainerBins<EVT, BVT>,
}
impl<BVT> BinnedBinsTimeweight<BVT>
impl<EVT, BVT> BinnedBinsTimeweight<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
pub fn new(range: BinnedRange<TsNano>) -> Self {
trace_init!("BinnedBinsTimeweight::new {}", range);
let active_beg = range.nano_beg();
let active_end = active_beg.add_dt_nano(range.bin_len.to_dt_nano());
let active_len = active_end.delta(active_beg);
let active_end = active_beg.add_dt_nano(range.bin_len_dt_ns());
Self {
range,
// out: ContainerBins::new(),
produce_cnt_zero: true,
// agg: todo!(),
t1: PhantomData,
active_beg,
active_end,
cnt: 0,
min: None,
max: None,
lst: None,
agg: BVT::AggregatorTw::new(),
non_fnl: false,
out: ContainerBins::new(),
}
}
fn maybe_emit_active(&mut self) {
if self.cnt != 0 {
let ts1 = self.active_beg;
let ts2 = self.active_end;
let cnt = self.cnt;
let min = self.min.as_ref().unwrap().clone();
let max = self.max.as_ref().unwrap().clone();
let fr = 1.;
let agg = self.agg.result(fr);
self.agg.reset_for_new_bin();
let lst = self.lst.as_ref().unwrap().clone();
let fnl = self.non_fnl == false;
self.out.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl);
}
}
fn active_forward(&mut self, ts1: TsNano) {
self.cnt = 0;
self.min = self.lst.clone();
self.max = self.lst.clone();
let bl = self.range.bin_len_dt_ns();
let tsnext = TsNano::from_ns(ts1.ns() / bl.ns() * bl.ns());
self.active_beg = tsnext;
self.active_end = tsnext.add_dt_nano(bl);
self.non_fnl = false;
}
fn bound(a: &mut Option<EVT>, b: &EVT, f: impl Fn(&EVT, &EVT) -> bool) {
if let Some(x) = a.as_mut() {
if f(b, x) {
*x = b.clone();
}
} else {
*a = Some(b.clone());
}
}
fn ingest_bins(&mut self, bins: &ContainerBins<EVT, BVT>) -> Result<(), BinningggError> {
for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() {
let grid = self.range.bin_len_dt_ns();
trace_ingest_bin!("grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg);
if ts1 < self.active_beg {
self.lst = Some(lst.clone());
} else {
if ts1 >= self.active_end {
self.maybe_emit_active();
self.active_forward(ts1);
}
self.cnt += cnt;
Self::bound(&mut self.min, min, PartialOrd::lt);
Self::bound(&mut self.max, max, PartialOrd::gt);
let dt = ts2.delta(ts1);
let bl = self.range.bin_len_dt_ns();
self.agg.ingest(dt, bl, cnt, agg.clone());
self.non_fnl |= !fnl;
self.lst = Some(lst.clone());
}
}
Ok(())
}
}
impl<BVT> BinnedBinsTimeweightTrait for BinnedBinsTimeweight<BVT>
impl<EVT, BVT> BinnedBinsTimeweightTrait for BinnedBinsTimeweight<EVT, BVT>
where
EVT: EventValueType,
BVT: BinAggedType,
{
fn ingest(&mut self, evs: &BinsBoxed) -> Result<(), BinningggError> {
todo!()
fn ingest(&mut self, bins: &BinsBoxed) -> Result<(), BinningggError> {
if let Some(bins) = bins.as_any_ref().downcast_ref::<ContainerBins<EVT, BVT>>() {
self.ingest_bins(bins)
} else {
Err(BinningggError::TypeMismatch {
have: bins.type_name().into(),
expect: any::type_name::<BVT>().into(),
})
}
}
fn input_done_range_final(&mut self) -> Result<(), BinningggError> {
todo!()
self.maybe_emit_active();
self.active_forward(self.active_beg.add_dt_nano(self.range.bin_len_dt_ns()));
Ok(())
}
fn input_done_range_open(&mut self) -> Result<(), BinningggError> {
todo!()
self.non_fnl = true;
self.maybe_emit_active();
self.active_forward(self.active_beg.add_dt_nano(self.range.bin_len_dt_ns()));
Ok(())
}
fn output(&mut self) -> Result<Option<BinsBoxed>, BinningggError> {
todo!()
if self.out.len() == 0 {
Ok(None)
} else {
let ret = std::mem::replace(&mut self.out, ContainerBins::new());
Ok(Some(Box::new(ret)))
}
}
}

View File

@@ -23,6 +23,9 @@ use items_0::Events;
use std::fmt;
mod log {
#[cfg(not(test))]
pub use netpod::log::*;
#[cfg(test)]
pub use netpod::log::*;
}