From 068f6835a5abb29643b5e1abeceb808708c419f9 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 23 Jul 2025 23:56:02 +0200 Subject: [PATCH] Remove limit on empty bins generation --- src/binning/timeweight.rs | 9 ---- .../timeweight/timeweight_bins_stream.rs | 9 ++-- src/binning/timeweight/timeweight_events.rs | 49 +++++++++++++------ .../timeweight/timeweight_events_dyn.rs | 4 +- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/binning/timeweight.rs b/src/binning/timeweight.rs index 230b8a3..cc53590 100644 --- a/src/binning/timeweight.rs +++ b/src/binning/timeweight.rs @@ -4,12 +4,3 @@ pub mod timeweight_bins_lazy; pub mod timeweight_bins_stream; pub mod timeweight_events; pub mod timeweight_events_dyn; - -#[allow(unused)] -macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } - -#[allow(unused)] -macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } diff --git a/src/binning/timeweight/timeweight_bins_stream.rs b/src/binning/timeweight/timeweight_bins_stream.rs index f2f0752..25aeffc 100644 --- a/src/binning/timeweight/timeweight_bins_stream.rs +++ b/src/binning/timeweight/timeweight_bins_stream.rs @@ -13,9 +13,11 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) } +macro_rules! debug { ($($arg:tt)*) => ( if false { log::debug!($($arg)*); }) } -macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) } +macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } + +macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) } autoerr::create_error_v1!( name(Error, "BinnedEventsTimeweightDyn"), @@ -121,7 +123,8 @@ impl BinnedBinsTimeweightStream { Ready(Some(Ok(DataItem(Data(x))))) } None => { - let item = LogItem::from_node(log::Level::INFO, format!("no bins ready on eos")); + debug!("no bins ready on eos"); + let item = LogItem::from_node(log::Level::DEBUG, format!("no bins ready on eos")); Ready(Some(Ok(Log(item)))) } } diff --git a/src/binning/timeweight/timeweight_events.rs b/src/binning/timeweight/timeweight_events.rs index f0595a5..6041f74 100644 --- a/src/binning/timeweight/timeweight_events.rs +++ b/src/binning/timeweight/timeweight_events.rs @@ -45,11 +45,13 @@ macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($ macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } +macro_rules! trace_zero_fill { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) } + const COL1: &'static str = "\x1b[1m"; const RST: &'static str = "\x1b[0m"; const VERIFY_INPUT_EVENTS: bool = false; -const OUT_LEN_MAX: usize = 20000; +const OUT_LEN_MAX: usize = 10000; #[cold] #[inline] @@ -443,6 +445,7 @@ where inner_a: InnerA, #[serde(skip)] out: ContainerBins, + input_done_range_final_continue_output: bool, } impl fmt::Debug for BinnedEventsTimeweight @@ -456,6 +459,10 @@ where .field("lst", &self.lst) .field("inner_a", &self.inner_a) .field("out", &self.out) + .field( + "input_done_range_final_continue_output", + &self.input_done_range_final_continue_output, + ) .finish() } } @@ -492,6 +499,7 @@ where minmax: None, }, out: ContainerBins::new(), + input_done_range_final_continue_output: false, } } @@ -579,11 +587,19 @@ where let mut i = 0; loop { i += 1; - assert!(i < 100000, "too many iterations"); + assert!(i < 3 * OUT_LEN_MAX, "too many iterations"); let b = &self.inner_a.inner_b; if self.out.len() > OUT_LEN_MAX { // TODO change api such that we can produce arbitrary bins as stream. - info!("produced too many bins out len {}", self.out.len()); + // TODO instead of log, collect in metrics. + trace_zero_fill!( + "produced too many bins out len {} {} {} {} {}", + self.out.len(), + b.active_beg, + b.active_end, + b.filled_until, + ts + ); break; } else if ts > b.filled_until { if ts >= b.active_end { @@ -657,21 +673,12 @@ where } pub fn ingest(&mut self, evs: &ContainerEvents) -> Result { - // It is this type's task to find and store the one-before event. - // We then pass it to the aggregation. - // AggregatorTimeWeight needs a function for that. - // What about counting the events that actually fall into the range? - // Maybe that should be done in this type. - // That way we can pass the values and weights to the aggregation, and count the in-range here. - // This type must also "close" the current aggregation by passing the "last" and init the next. - // ALSO: need to keep track of the "lst". Probably best done in this type as well? - // TODO should rely on external stream adapter for verification to not duplicate things. if VERIFY_INPUT_EVENTS { evs.verify()?; } + let mut nconsumed = 0; let mut evs = ContainerEventsTakeUpTo::new(evs); - loop { trace_ingest_container!("+++++++++++++++++++++++++++++++++++++++++++++++++++"); trace_ingest_container!( @@ -693,6 +700,11 @@ where b.active_end ); self.cycle_01(ts); + if self.out.len() > OUT_LEN_MAX { + // TODO collect for metrics + let ret = IngestReport::ConsumedPart(nconsumed); + return Ok(ret); + } } let n1 = evs.len(); // TODO instead of mutable constrain/expand, use cheap derived subslices. @@ -725,6 +737,7 @@ where debug!("{}", e); return Err(e); } else { + nconsumed += n1 - n2; continue; } } else { @@ -737,6 +750,9 @@ where pub fn input_done_range_final(&mut self) -> Result<(), Error> { trace_cycle!("{}input_done_range_final{}", COL1, RST); self.cycle_01(self.range.nano_end()); + if self.out.len() > OUT_LEN_MAX { + self.input_done_range_final_continue_output = true; + } Ok(()) } @@ -751,6 +767,11 @@ where } pub fn output(&mut self) -> ContainerBins { - mem::replace(&mut self.out, ContainerBins::new()) + let ret = mem::replace(&mut self.out, ContainerBins::new()); + if self.input_done_range_final_continue_output { + self.input_done_range_final_continue_output = false; + self.cycle_01(self.range.nano_end()); + } + ret } } diff --git a/src/binning/timeweight/timeweight_events_dyn.rs b/src/binning/timeweight/timeweight_events_dyn.rs index 98c02d3..e9e1237 100644 --- a/src/binning/timeweight/timeweight_events_dyn.rs +++ b/src/binning/timeweight/timeweight_events_dyn.rs @@ -394,9 +394,9 @@ impl BinnedEventsTimeweightStream { Ready(Some(Ok(DataItem(Data(x))))) } None => { - debug!("no bins ready on eos"); self.state = StreamState::Done; - let item = LogItem::from_node(log::Level::INFO, format!("no bins ready on eos")); + debug!("no bins ready on eos"); + let item = LogItem::from_node(log::Level::DEBUG, format!("no bins ready on eos")); Ready(Some(Ok(Log(item)))) } }