Read from prebinned

This commit is contained in:
Dominik Werder
2024-12-09 23:06:05 +01:00
parent 1af83c30d8
commit 402c5e41ee
3 changed files with 77 additions and 27 deletions

View File

@@ -43,6 +43,17 @@ pub enum PrebinnedPartitioning {
}
impl PrebinnedPartitioning {
pub fn bin_len(&self) -> DtMs {
use PrebinnedPartitioning::*;
match self {
Sec1 => DtMs::from_ms_u64(1000 * 1),
Sec10 => DtMs::from_ms_u64(1000 * 10),
Min1 => DtMs::from_ms_u64(1000 * 60 * 1),
Min10 => DtMs::from_ms_u64(1000 * 60 * 10),
Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 1),
}
}
pub fn msp_div(&self) -> DtMs {
use PrebinnedPartitioning::*;
match self {
@@ -53,14 +64,33 @@ impl PrebinnedPartitioning {
Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28),
}
}
pub fn off_max(&self) -> u32 {
self.msp_div().ms() as u32 / self.bin_len().ms() as u32
}
pub fn clamp_off(&self, off: u32) -> u32 {
self.off_max().min(off)
}
}
impl TryFrom<DtMs> for PrebinnedPartitioning {
type Error = Error;
fn try_from(value: DtMs) -> Result<Self, Self::Error> {
match value {
_ => Err(Error::PrebinnedPartitioningInvalid),
use PrebinnedPartitioning::*;
if value == DtMs::from_ms_u64(1000) {
Ok(Sec1)
} else if value == DtMs::from_ms_u64(1000 * 10) {
Ok(Sec10)
} else if value == DtMs::from_ms_u64(1000 * 60) {
Ok(Min1)
} else if value == DtMs::from_ms_u64(1000 * 60 * 10) {
Ok(Min10)
} else if value == DtMs::from_ms_u64(1000 * 60 * 60) {
Ok(Hour1)
} else {
Err(Error::PrebinnedPartitioningInvalid)
}
}
}
@@ -180,7 +210,9 @@ impl Stream for CachedReader {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
let ts1next = self.ts1next.clone();
break if let Some(fut) = self.reading.as_mut() {
info!("CachedReader poll reading ts1next {:?}", ts1next);
match fut.poll_unpin(cx) {
Ready(x) => {
self.reading = None;
@@ -203,6 +235,7 @@ impl Stream for CachedReader {
}
} else {
if self.ts1next < self.range.nano_end() {
info!("CachedReader try setup next ts1next {:?}", self.ts1next);
match PrebinnedPartitioning::try_from(self.range.bin_len_dt_ms()) {
Ok(partt) => {
let binlen = self.bin_len.ns();
@@ -210,7 +243,8 @@ impl Stream for CachedReader {
let msp = self.ts1next.ns() / div;
let off1 = (self.ts1next.ns() - div * msp) / binlen;
let off2 = (self.range.nano_end().ns() - div * msp) / binlen;
self.ts1next = TsNano::from_ns(binlen * off2 + div * msp);
let off2 = partt.clamp_off(off2 as u32);
self.ts1next = TsNano::from_ns(binlen * off2 as u64 + div * msp);
let offs = off1 as u32..off2 as u32;
let fut =
self.cache_read_provider
@@ -224,6 +258,7 @@ impl Stream for CachedReader {
}
}
} else {
info!("CachedReader no more setup ts1next {:?}", self.ts1next);
Ready(None)
}
};

View File

@@ -25,7 +25,7 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) }
autoerr::create_error_v1!(
name(Error, "TimeBinnedFromLayers"),
@@ -61,11 +61,12 @@ impl TimeBinnedFromLayers {
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Self, Error> {
trace_init!(
"{}::new {:?} {:?} {:?}",
"{}::new {:?} {:?} {:?} {:?}",
Self::type_name(),
ch_conf.series(),
range,
bin_len_layers
bin_len_layers,
binning_opts
);
let bin_len = DtMs::from_ms_u64(range.bin_len.ms());
if bin_len_layers.contains(&bin_len) {
@@ -123,26 +124,37 @@ impl TimeBinnedFromLayers {
Ok(ret)
}
None => {
trace_init!("{}::new next finer from events", Self::type_name());
let series_range = SeriesRange::TimeRange(range.to_nano_range());
let one_before_range = true;
let select = EventsSubQuerySelect::new(
ch_conf.clone(),
series_range,
one_before_range,
transform_query.clone(),
);
let evq = EventsSubQuery::from_parts(
select,
sub.clone(),
ctx.reqid().into(),
log_level.clone(),
);
let inp =
BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?;
let ret = Self { inp: Box::pin(inp) };
trace_init!("{}::new setup from events", Self::type_name());
Ok(ret)
if binning_opts.allow_from_events() {
trace_init!("{}::new next finer from events", Self::type_name());
let series_range = SeriesRange::TimeRange(range.to_nano_range());
let one_before_range = true;
let select = EventsSubQuerySelect::new(
ch_conf.clone(),
series_range,
one_before_range,
transform_query.clone(),
);
let evq = EventsSubQuery::from_parts(
select,
sub.clone(),
ctx.reqid().into(),
log_level.clone(),
);
let inp = BinnedFromEvents::new(
range,
evq,
do_time_weight,
events_read_provider,
)?;
let ret = Self { inp: Box::pin(inp) };
trace_init!("{}::new setup from events", Self::type_name());
Ok(ret)
} else {
let inp = futures_util::stream::iter([]);
let ret = Self { inp: Box::pin(inp) };
trace_init!("{}::new setup nothing", Self::type_name());
Ok(ret)
}
}
}
}

View File

@@ -285,7 +285,7 @@ impl GapFill {
panic!("TODO unweighted")
};
self.inp_finer = Some(Box::pin(stream));
} else {
} else if self.binning_opts.allow_from_events() {
debug_setup!(
"{} setup_inp_finer next finer from events {}",
self.dbgname,
@@ -313,6 +313,9 @@ impl GapFill {
self.events_read_provider.clone(),
)?;
self.inp_finer = Some(Box::pin(inp));
} else {
let stream = futures_util::stream::iter([]);
self.inp_finer = Some(Box::pin(stream));
}
Ok(())
}