This commit is contained in:
Dominik Werder
2025-06-05 15:24:50 +02:00
parent ebafcb2b40
commit 235c2c0488
3 changed files with 89 additions and 23 deletions

View File

@@ -1019,7 +1019,6 @@ where
}
pub fn next(&mut self) -> Option<EventSingleRef<EVT>> {
eprintln!("ContainerEvents pos {} end {}", self.pos, self.end);
let evs = &self.evs;
if self.pos < self.end {
if let (Some(&ts), Some(val)) =

View File

@@ -3,7 +3,7 @@ use crate::binning::container::bins::BinAggedType;
use crate::binning::container_bins::ContainerBins;
use crate::binning::container_events::EventValueType;
use crate::binning::container_events::PartialOrdEvtA;
use crate::log::*;
use crate::log;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinningggError;
use items_0::timebin::BinsBoxed;
@@ -12,9 +12,11 @@ use netpod::TsNano;
use serde::Serialize;
use std::any;
macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) }
macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
macro_rules! trace_ingest_bin { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) }
macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!("BIN EMIT {}", format_args!($($arg)*)); }) }
autoerr::create_error_v1!(
name(Error, "BinBinsTimeweight"),
@@ -36,6 +38,7 @@ where
min: Option<EVT>,
max: Option<EVT>,
lst: Option<EVT>,
fraction_filled: f32,
// TODO
#[serde(skip)]
agg: <BVT as BinAggedType>::AggregatorTw,
@@ -61,6 +64,7 @@ where
min: None,
max: None,
lst: None,
fraction_filled: 0.,
agg: BVT::AggregatorTw::new(),
non_fnl: false,
out: ContainerBins::new(),
@@ -72,24 +76,62 @@ where
self.produce_cnt_zero = true;
}
fn reset_for_new_bin(&mut self) {
self.cnt = 0;
self.agg.reset_for_new_bin();
if self.lst.is_some() {
self.fraction_filled = 1.;
}
}
fn maybe_emit_active(&mut self) {
let selfname = "maybe_emit_active";
if self.cnt != 0 || self.produce_cnt_zero && self.min.is_some() {
let ts1 = self.active_beg;
let ts2 = self.active_end;
let cnt = self.cnt;
let min = self.min.as_ref().unwrap().clone();
let max = self.max.as_ref().unwrap().clone();
let fr = 1.;
let agg = self.agg.result(fr);
self.agg.reset_for_new_bin();
let agg = self.agg.result(self.fraction_filled);
let lst = self.lst.as_ref().unwrap().clone();
let fnl = self.non_fnl == false;
trace_emit!(
"{selfname} push out {} {} cnt {} min {:?} max {:?} agg {:?} lst {:?} fnl {:?}",
ts1,
ts2,
cnt,
min,
max,
agg,
lst,
fnl
);
self.out.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl);
} else {
trace_emit!(
"{selfname} do NOT produce bin cnt {cnt} cz {cz} lst {lst:?} min {min:?} max {max:?}",
cnt = self.cnt,
cz = self.produce_cnt_zero,
lst = self.lst,
min = self.min,
max = self.max
);
}
self.reset_for_new_bin();
}
fn active_forward(&mut self, ts1: TsNano) {
self.cnt = 0;
let selfname = "active_forward";
trace_emit!(
"{selfname} CUR {beg} {end}",
beg = self.active_beg,
end = self.active_end
);
if self.produce_cnt_zero {
// TODO
// actually produce cnt-zero bins
} else {
}
self.min = self.lst.clone();
self.max = self.lst.clone();
let bl = self.range.bin_len_dt_ns();
@@ -97,6 +139,11 @@ where
self.active_beg = tsnext;
self.active_end = tsnext.add_dt_nano(bl);
self.non_fnl = false;
trace_emit!(
"{selfname} NEW {beg} {end}",
beg = self.active_beg,
end = self.active_end
);
}
fn bound(a: &mut Option<EVT>, b: <EVT as EventValueType>::IterTy1<'_>, d: std::cmp::Ordering) {
@@ -113,16 +160,31 @@ where
}
fn ingest_bins(&mut self, bins: &ContainerBins<EVT, BVT>) -> Result<(), BinningggError> {
trace_ingest_bin!(
"\n\n+++++++++++++\n\ningest_bins active_beg {active_beg}",
active_beg = self.active_beg
);
for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() {
let grid = self.range.bin_len_dt_ns();
trace_ingest_bin!("ingest_bins grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg);
trace_ingest_bin!(
"ingest_bins + + + + grid {:?} ts1 {:?} agg {:?}",
grid,
ts1,
agg
);
if ts1 < self.active_beg {
self.lst = Some(lst.into());
} else {
if ts1 >= self.active_end {
trace_ingest_bin!("{}", "ingest loop finish current bin");
self.maybe_emit_active();
self.active_forward(ts1);
}
if ts1 == self.active_beg {
trace_ingest_bin!("{}", "HARD SET BOTH MINMAX");
self.min = Some(min.clone().into());
self.max = Some(max.clone().into());
}
self.cnt += cnt;
Self::bound(&mut self.min, min, std::cmp::Ordering::Less);
Self::bound(&mut self.max, max, std::cmp::Ordering::Greater);
@@ -131,6 +193,11 @@ where
self.agg.ingest(dt, bl, cnt, agg.into());
self.non_fnl |= !fnl;
self.lst = Some(lst.into());
if ts2 >= self.active_end {
trace_ingest_bin!("{}", "ingest loop finish current bin");
self.maybe_emit_active();
self.active_forward(ts2);
}
}
}
Ok(())

View File

@@ -15,35 +15,35 @@ use serde::Serialize;
use std::fmt;
use std::mem;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) }
macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) }
macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ) }
macro_rules! trace_ { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) }
macro_rules! trace_ { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) }
macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace_!($($arg),*); } ) }
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); } ) }
macro_rules! trace_output { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_output { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_cycle { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => (
macro_rules! trace_event_next { ($fmt:expr, $($arg:tt)*) => (
if false {
trace_!("{} {}", "\x1b[1mEVENT POP FRONT\x1b[0m ", format_args!($fmt, $($arg),*));
trace_!("{} {}", "\x1b[1mEVENT POP FRONT\x1b[0m ", format_args!($fmt, $($arg)*));
}
) }
macro_rules! trace_ingest_init_lst { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_minmax { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_container { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_ingest_container_2 { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_fill_until { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) }
macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
const COL1: &'static str = "\x1b[1m";
const RST: &'static str = "\x1b[0m";