From 402c5e41eeb67413533eb7a582471a02cd7d7b70 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 9 Dec 2024 23:06:05 +0100 Subject: [PATCH] Read from prebinned --- src/timebin/cached/reader.rs | 41 +++++++++++++++++++++++-- src/timebin/fromlayers.rs | 58 ++++++++++++++++++++++-------------- src/timebin/gapfill.rs | 5 +++- 3 files changed, 77 insertions(+), 27 deletions(-) diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index 67fa5a4..0dd30bc 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -43,6 +43,17 @@ pub enum PrebinnedPartitioning { } impl PrebinnedPartitioning { + pub fn bin_len(&self) -> DtMs { + use PrebinnedPartitioning::*; + match self { + Sec1 => DtMs::from_ms_u64(1000 * 1), + Sec10 => DtMs::from_ms_u64(1000 * 10), + Min1 => DtMs::from_ms_u64(1000 * 60 * 1), + Min10 => DtMs::from_ms_u64(1000 * 60 * 10), + Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 1), + } + } + pub fn msp_div(&self) -> DtMs { use PrebinnedPartitioning::*; match self { @@ -53,14 +64,33 @@ impl PrebinnedPartitioning { Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28), } } + + pub fn off_max(&self) -> u32 { + self.msp_div().ms() as u32 / self.bin_len().ms() as u32 + } + + pub fn clamp_off(&self, off: u32) -> u32 { + self.off_max().min(off) + } } impl TryFrom for PrebinnedPartitioning { type Error = Error; fn try_from(value: DtMs) -> Result { - match value { - _ => Err(Error::PrebinnedPartitioningInvalid), + use PrebinnedPartitioning::*; + if value == DtMs::from_ms_u64(1000) { + Ok(Sec1) + } else if value == DtMs::from_ms_u64(1000 * 10) { + Ok(Sec10) + } else if value == DtMs::from_ms_u64(1000 * 60) { + Ok(Min1) + } else if value == DtMs::from_ms_u64(1000 * 60 * 10) { + Ok(Min10) + } else if value == DtMs::from_ms_u64(1000 * 60 * 60) { + Ok(Hour1) + } else { + Err(Error::PrebinnedPartitioningInvalid) } } } @@ -180,7 +210,9 @@ impl Stream for CachedReader { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { + let ts1next = self.ts1next.clone(); break if let Some(fut) = self.reading.as_mut() { + info!("CachedReader poll reading ts1next {:?}", ts1next); match fut.poll_unpin(cx) { Ready(x) => { self.reading = None; @@ -203,6 +235,7 @@ impl Stream for CachedReader { } } else { if self.ts1next < self.range.nano_end() { + info!("CachedReader try setup next ts1next {:?}", self.ts1next); match PrebinnedPartitioning::try_from(self.range.bin_len_dt_ms()) { Ok(partt) => { let binlen = self.bin_len.ns(); @@ -210,7 +243,8 @@ impl Stream for CachedReader { let msp = self.ts1next.ns() / div; let off1 = (self.ts1next.ns() - div * msp) / binlen; let off2 = (self.range.nano_end().ns() - div * msp) / binlen; - self.ts1next = TsNano::from_ns(binlen * off2 + div * msp); + let off2 = partt.clamp_off(off2 as u32); + self.ts1next = TsNano::from_ns(binlen * off2 as u64 + div * msp); let offs = off1 as u32..off2 as u32; let fut = self.cache_read_provider @@ -224,6 +258,7 @@ impl Stream for CachedReader { } } } else { + info!("CachedReader no more setup ts1next {:?}", self.ts1next); Ready(None) } }; diff --git a/src/timebin/fromlayers.rs b/src/timebin/fromlayers.rs index d5a513b..b20ce9c 100644 --- a/src/timebin/fromlayers.rs +++ b/src/timebin/fromlayers.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -macro_rules! trace_init { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) } +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } autoerr::create_error_v1!( name(Error, "TimeBinnedFromLayers"), @@ -61,11 +61,12 @@ impl TimeBinnedFromLayers { events_read_provider: Arc, ) -> Result { trace_init!( - "{}::new {:?} {:?} {:?}", + "{}::new {:?} {:?} {:?} {:?}", Self::type_name(), ch_conf.series(), range, - bin_len_layers + bin_len_layers, + binning_opts ); let bin_len = DtMs::from_ms_u64(range.bin_len.ms()); if bin_len_layers.contains(&bin_len) { @@ -123,26 +124,37 @@ impl TimeBinnedFromLayers { Ok(ret) } None => { - trace_init!("{}::new next finer from events", Self::type_name()); - let series_range = SeriesRange::TimeRange(range.to_nano_range()); - let one_before_range = true; - let select = EventsSubQuerySelect::new( - ch_conf.clone(), - series_range, - one_before_range, - transform_query.clone(), - ); - let evq = EventsSubQuery::from_parts( - select, - sub.clone(), - ctx.reqid().into(), - log_level.clone(), - ); - let inp = - BinnedFromEvents::new(range, evq, do_time_weight, events_read_provider)?; - let ret = Self { inp: Box::pin(inp) }; - trace_init!("{}::new setup from events", Self::type_name()); - Ok(ret) + if binning_opts.allow_from_events() { + trace_init!("{}::new next finer from events", Self::type_name()); + let series_range = SeriesRange::TimeRange(range.to_nano_range()); + let one_before_range = true; + let select = EventsSubQuerySelect::new( + ch_conf.clone(), + series_range, + one_before_range, + transform_query.clone(), + ); + let evq = EventsSubQuery::from_parts( + select, + sub.clone(), + ctx.reqid().into(), + log_level.clone(), + ); + let inp = BinnedFromEvents::new( + range, + evq, + do_time_weight, + events_read_provider, + )?; + let ret = Self { inp: Box::pin(inp) }; + trace_init!("{}::new setup from events", Self::type_name()); + Ok(ret) + } else { + let inp = futures_util::stream::iter([]); + let ret = Self { inp: Box::pin(inp) }; + trace_init!("{}::new setup nothing", Self::type_name()); + Ok(ret) + } } } } diff --git a/src/timebin/gapfill.rs b/src/timebin/gapfill.rs index 40c60f2..cd41f0e 100644 --- a/src/timebin/gapfill.rs +++ b/src/timebin/gapfill.rs @@ -285,7 +285,7 @@ impl GapFill { panic!("TODO unweighted") }; self.inp_finer = Some(Box::pin(stream)); - } else { + } else if self.binning_opts.allow_from_events() { debug_setup!( "{} setup_inp_finer next finer from events {}", self.dbgname, @@ -313,6 +313,9 @@ impl GapFill { self.events_read_provider.clone(), )?; self.inp_finer = Some(Box::pin(inp)); + } else { + let stream = futures_util::stream::iter([]); + self.inp_finer = Some(Box::pin(stream)); } Ok(()) }