diff --git a/src/binning/container_events.rs b/src/binning/container_events.rs index d114286..4760ded 100644 --- a/src/binning/container_events.rs +++ b/src/binning/container_events.rs @@ -1019,7 +1019,6 @@ where } pub fn next(&mut self) -> Option> { - eprintln!("ContainerEvents pos {} end {}", self.pos, self.end); let evs = &self.evs; if self.pos < self.end { if let (Some(&ts), Some(val)) = diff --git a/src/binning/timeweight/timeweight_bins.rs b/src/binning/timeweight/timeweight_bins.rs index 9689656..701665a 100644 --- a/src/binning/timeweight/timeweight_bins.rs +++ b/src/binning/timeweight/timeweight_bins.rs @@ -3,7 +3,7 @@ use crate::binning::container::bins::BinAggedType; use crate::binning::container_bins::ContainerBins; use crate::binning::container_events::EventValueType; use crate::binning::container_events::PartialOrdEvtA; -use crate::log::*; +use crate::log; use items_0::timebin::BinnedBinsTimeweightTrait; use items_0::timebin::BinningggError; use items_0::timebin::BinsBoxed; @@ -12,9 +12,11 @@ use netpod::TsNano; use serde::Serialize; use std::any; -macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace!($($arg),*); }) } +macro_rules! trace_init { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } -macro_rules! trace_ingest_bin { ($($arg:expr),*) => ( if false { trace!($($arg),*); }) } +macro_rules! trace_ingest_bin { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!("BIN EMIT {}", format_args!($($arg)*)); }) } autoerr::create_error_v1!( name(Error, "BinBinsTimeweight"), @@ -36,6 +38,7 @@ where min: Option, max: Option, lst: Option, + fraction_filled: f32, // TODO #[serde(skip)] agg: ::AggregatorTw, @@ -61,6 +64,7 @@ where min: None, max: None, lst: None, + fraction_filled: 0., agg: BVT::AggregatorTw::new(), non_fnl: false, out: ContainerBins::new(), @@ -72,24 +76,62 @@ where self.produce_cnt_zero = true; } + fn reset_for_new_bin(&mut self) { + self.cnt = 0; + self.agg.reset_for_new_bin(); + if self.lst.is_some() { + self.fraction_filled = 1.; + } + } + fn maybe_emit_active(&mut self) { + let selfname = "maybe_emit_active"; if self.cnt != 0 || self.produce_cnt_zero && self.min.is_some() { let ts1 = self.active_beg; let ts2 = self.active_end; let cnt = self.cnt; let min = self.min.as_ref().unwrap().clone(); let max = self.max.as_ref().unwrap().clone(); - let fr = 1.; - let agg = self.agg.result(fr); - self.agg.reset_for_new_bin(); + let agg = self.agg.result(self.fraction_filled); let lst = self.lst.as_ref().unwrap().clone(); let fnl = self.non_fnl == false; + trace_emit!( + "{selfname} push out {} {} cnt {} min {:?} max {:?} agg {:?} lst {:?} fnl {:?}", + ts1, + ts2, + cnt, + min, + max, + agg, + lst, + fnl + ); self.out.push_back(ts1, ts2, cnt, min, max, agg, lst, fnl); + } else { + trace_emit!( + "{selfname} do NOT produce bin cnt {cnt} cz {cz} lst {lst:?} min {min:?} max {max:?}", + cnt = self.cnt, + cz = self.produce_cnt_zero, + lst = self.lst, + min = self.min, + max = self.max + ); } + self.reset_for_new_bin(); } fn active_forward(&mut self, ts1: TsNano) { - self.cnt = 0; + let selfname = "active_forward"; + trace_emit!( + "{selfname} CUR {beg} {end}", + beg = self.active_beg, + end = self.active_end + ); + if self.produce_cnt_zero { + // TODO + // actually produce cnt-zero bins + } else { + } self.min = self.lst.clone(); self.max = self.lst.clone(); let bl = self.range.bin_len_dt_ns(); @@ -97,6 +139,11 @@ where self.active_beg = tsnext; self.active_end = tsnext.add_dt_nano(bl); self.non_fnl = false; + trace_emit!( + "{selfname} NEW {beg} {end}", + beg = self.active_beg, + end = self.active_end + ); } fn bound(a: &mut Option, b: ::IterTy1<'_>, d: std::cmp::Ordering) { @@ -113,16 +160,31 @@ where } fn ingest_bins(&mut self, bins: &ContainerBins) -> Result<(), BinningggError> { + trace_ingest_bin!( + "\n\n+++++++++++++\n\ningest_bins active_beg {active_beg}", + active_beg = self.active_beg + ); for (((((((&ts1, &ts2), &cnt), min), max), agg), lst), &fnl) in bins.zip_iter() { let grid = self.range.bin_len_dt_ns(); - trace_ingest_bin!("ingest_bins grid {:?} ts1 {:?} agg {:?}", grid, ts1, agg); + trace_ingest_bin!( + "ingest_bins + + + + grid {:?} ts1 {:?} agg {:?}", + grid, + ts1, + agg + ); if ts1 < self.active_beg { self.lst = Some(lst.into()); } else { if ts1 >= self.active_end { + trace_ingest_bin!("{}", "ingest loop finish current bin"); self.maybe_emit_active(); self.active_forward(ts1); } + if ts1 == self.active_beg { + trace_ingest_bin!("{}", "HARD SET BOTH MINMAX"); + self.min = Some(min.clone().into()); + self.max = Some(max.clone().into()); + } self.cnt += cnt; Self::bound(&mut self.min, min, std::cmp::Ordering::Less); Self::bound(&mut self.max, max, std::cmp::Ordering::Greater); @@ -131,6 +193,11 @@ where self.agg.ingest(dt, bl, cnt, agg.into()); self.non_fnl |= !fnl; self.lst = Some(lst.into()); + if ts2 >= self.active_end { + trace_ingest_bin!("{}", "ingest loop finish current bin"); + self.maybe_emit_active(); + self.active_forward(ts2); + } } } Ok(()) diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index 7cb2b65..5dda5e1 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -15,35 +15,35 @@ use serde::Serialize; use std::fmt; use std::mem; -macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ) } +macro_rules! info { ($($arg:tt)*) => ( if true { log::info!($($arg)*); } ) } -macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ) } +macro_rules! debug { ($($arg:tt)*) => ( if true { log::debug!($($arg)*); } ) } -macro_rules! trace_ { ($($arg:expr),*) => ( if true { log::trace!($($arg),*); } ) } +macro_rules! trace_ { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ) } -macro_rules! trace_init { ($($arg:expr),*) => ( if true { trace_!($($arg),*); } ) } +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace_!($($arg)*); } ) } -macro_rules! trace_output { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_output { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_cycle { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_cycle { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_event_next { ($fmt:expr, $($arg:expr),*) => ( +macro_rules! trace_event_next { ($fmt:expr, $($arg:tt)*) => ( if false { - trace_!("{} {}", "\x1b[1mEVENT POP FRONT\x1b[0m ", format_args!($fmt, $($arg),*)); + trace_!("{} {}", "\x1b[1mEVENT POP FRONT\x1b[0m ", format_args!($fmt, $($arg)*)); } ) } -macro_rules! trace_ingest_init_lst { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_ingest_init_lst { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_ingest_minmax { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_ingest_minmax { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_ingest_event { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_ingest_event { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_ingest_container { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_ingest_container { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_ingest_container_2 { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } -macro_rules! trace_fill_until { ($($arg:expr),*) => ( if true { trace_!($($arg),*); }) } +macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } const COL1: &'static str = "\x1b[1m"; const RST: &'static str = "\x1b[0m";