Remove limit on empty bins generation

This commit is contained in:
Dominik Werder
2025-07-23 23:56:02 +02:00
parent 3508c463a2
commit 068f6835a5
4 changed files with 43 additions and 28 deletions

View File

@@ -4,12 +4,3 @@ pub mod timeweight_bins_lazy;
pub mod timeweight_bins_stream;
pub mod timeweight_events;
pub mod timeweight_events_dyn;
#[allow(unused)]
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }
#[allow(unused)]
macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) }

View File

@@ -13,9 +13,11 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_input_container { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) }
macro_rules! debug { ($($arg:tt)*) => ( if false { log::debug!($($arg)*); }) }
macro_rules! trace_emit { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); }) }
macro_rules! trace_input_container { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
macro_rules! trace_emit { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); }) }
autoerr::create_error_v1!(
name(Error, "BinnedEventsTimeweightDyn"),
@@ -121,7 +123,8 @@ impl BinnedBinsTimeweightStream {
Ready(Some(Ok(DataItem(Data(x)))))
}
None => {
let item = LogItem::from_node(log::Level::INFO, format!("no bins ready on eos"));
debug!("no bins ready on eos");
let item = LogItem::from_node(log::Level::DEBUG, format!("no bins ready on eos"));
Ready(Some(Ok(Log(item))))
}
}

View File

@@ -45,11 +45,13 @@ macro_rules! trace_ingest_container_2 { ($($arg:tt)*) => ( if true { trace_!($($
macro_rules! trace_fill_until { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
macro_rules! trace_zero_fill { ($($arg:tt)*) => ( if true { trace_!($($arg)*); }) }
const COL1: &'static str = "\x1b[1m";
const RST: &'static str = "\x1b[0m";
const VERIFY_INPUT_EVENTS: bool = false;
const OUT_LEN_MAX: usize = 20000;
const OUT_LEN_MAX: usize = 10000;
#[cold]
#[inline]
@@ -443,6 +445,7 @@ where
inner_a: InnerA<EVT>,
#[serde(skip)]
out: ContainerBins<EVT, EVT::AggTimeWeightOutputAvg>,
input_done_range_final_continue_output: bool,
}
impl<EVT> fmt::Debug for BinnedEventsTimeweight<EVT>
@@ -456,6 +459,10 @@ where
.field("lst", &self.lst)
.field("inner_a", &self.inner_a)
.field("out", &self.out)
.field(
"input_done_range_final_continue_output",
&self.input_done_range_final_continue_output,
)
.finish()
}
}
@@ -492,6 +499,7 @@ where
minmax: None,
},
out: ContainerBins::new(),
input_done_range_final_continue_output: false,
}
}
@@ -579,11 +587,19 @@ where
let mut i = 0;
loop {
i += 1;
assert!(i < 100000, "too many iterations");
assert!(i < 3 * OUT_LEN_MAX, "too many iterations");
let b = &self.inner_a.inner_b;
if self.out.len() > OUT_LEN_MAX {
// TODO change api such that we can produce arbitrary bins as stream.
info!("produced too many bins out len {}", self.out.len());
// TODO instead of log, collect in metrics.
trace_zero_fill!(
"produced too many bins out len {} {} {} {} {}",
self.out.len(),
b.active_beg,
b.active_end,
b.filled_until,
ts
);
break;
} else if ts > b.filled_until {
if ts >= b.active_end {
@@ -657,21 +673,12 @@ where
}
pub fn ingest(&mut self, evs: &ContainerEvents<EVT>) -> Result<IngestReport, Error> {
// It is this type's task to find and store the one-before event.
// We then pass it to the aggregation.
// AggregatorTimeWeight needs a function for that.
// What about counting the events that actually fall into the range?
// Maybe that should be done in this type.
// That way we can pass the values and weights to the aggregation, and count the in-range here.
// This type must also "close" the current aggregation by passing the "last" and init the next.
// ALSO: need to keep track of the "lst". Probably best done in this type as well?
// TODO should rely on external stream adapter for verification to not duplicate things.
if VERIFY_INPUT_EVENTS {
evs.verify()?;
}
let mut nconsumed = 0;
let mut evs = ContainerEventsTakeUpTo::new(evs);
loop {
trace_ingest_container!("+++++++++++++++++++++++++++++++++++++++++++++++++++");
trace_ingest_container!(
@@ -693,6 +700,11 @@ where
b.active_end
);
self.cycle_01(ts);
if self.out.len() > OUT_LEN_MAX {
// TODO collect for metrics
let ret = IngestReport::ConsumedPart(nconsumed);
return Ok(ret);
}
}
let n1 = evs.len();
// TODO instead of mutable constrain/expand, use cheap derived subslices.
@@ -725,6 +737,7 @@ where
debug!("{}", e);
return Err(e);
} else {
nconsumed += n1 - n2;
continue;
}
} else {
@@ -737,6 +750,9 @@ where
pub fn input_done_range_final(&mut self) -> Result<(), Error> {
trace_cycle!("{}input_done_range_final{}", COL1, RST);
self.cycle_01(self.range.nano_end());
if self.out.len() > OUT_LEN_MAX {
self.input_done_range_final_continue_output = true;
}
Ok(())
}
@@ -751,6 +767,11 @@ where
}
pub fn output(&mut self) -> ContainerBins<EVT, EVT::AggTimeWeightOutputAvg> {
mem::replace(&mut self.out, ContainerBins::new())
let ret = mem::replace(&mut self.out, ContainerBins::new());
if self.input_done_range_final_continue_output {
self.input_done_range_final_continue_output = false;
self.cycle_01(self.range.nano_end());
}
ret
}
}

View File

@@ -394,9 +394,9 @@ impl BinnedEventsTimeweightStream {
Ready(Some(Ok(DataItem(Data(x)))))
}
None => {
debug!("no bins ready on eos");
self.state = StreamState::Done;
let item = LogItem::from_node(log::Level::INFO, format!("no bins ready on eos"));
debug!("no bins ready on eos");
let item = LogItem::from_node(log::Level::DEBUG, format!("no bins ready on eos"));
Ready(Some(Ok(Log(item))))
}
}