From 049266bfe5bb1eecc46c4d7c3b0091fbc75977f2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 18 Sep 2024 23:59:03 +0200 Subject: [PATCH] WIP --- crates/items_2/src/binning.rs | 5 ++ .../items_2/src/binning/container_events.rs | 42 ++++++++++++++ crates/items_2/src/binning/test.rs | 2 + crates/items_2/src/binning/timeweight.rs | 14 +++++ .../src/binning/timeweight/timeweight_bins.rs | 5 ++ .../binning/timeweight/timeweight_events.rs | 31 ++++++++++ crates/items_2/src/binsdim0.rs | 42 +++++++++++--- crates/items_2/src/channelevents.rs | 22 ++++--- crates/items_2/src/eventsdim0.rs | 6 +- crates/items_2/src/items_2.rs | 1 + crates/items_2/src/timebin.rs | 57 +++++++------------ crates/netpod/src/netpod.rs | 14 ++++- crates/scyllaconn/src/schema.rs | 12 +++- crates/streams/src/timebin/gapfill.rs | 23 ++++++-- 14 files changed, 211 insertions(+), 65 deletions(-) create mode 100644 crates/items_2/src/binning.rs create mode 100644 crates/items_2/src/binning/container_events.rs create mode 100644 crates/items_2/src/binning/test.rs create mode 100644 crates/items_2/src/binning/timeweight.rs create mode 100644 crates/items_2/src/binning/timeweight/timeweight_bins.rs create mode 100644 crates/items_2/src/binning/timeweight/timeweight_events.rs diff --git a/crates/items_2/src/binning.rs b/crates/items_2/src/binning.rs new file mode 100644 index 0000000..dcd36c7 --- /dev/null +++ b/crates/items_2/src/binning.rs @@ -0,0 +1,5 @@ +pub mod container_events; +pub mod test; +pub mod timeweight; + +use super::binning as ___; diff --git a/crates/items_2/src/binning/container_events.rs b/crates/items_2/src/binning/container_events.rs new file mode 100644 index 0000000..516ae68 --- /dev/null +++ b/crates/items_2/src/binning/container_events.rs @@ -0,0 +1,42 @@ +use super::___; +use netpod::TsNano; +use serde::Deserialize; +use serde::Serialize; +use std::collections::VecDeque; + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +pub trait Container: Clone {} + +impl Container for VecDeque where T: EventValueType {} + +pub trait EventValueType: Clone { + type Container: Container; +} + +impl EventValueType for f32 { + type Container = VecDeque; +} + +#[derive(Clone)] +pub struct ContainerEvents +where + EVT: EventValueType, +{ + tss: VecDeque, + // vals: VecDeque, + vals: VecDeque<::Container>, +} + +// TODO why does this already impl Serialize even though there is no bound for EVT? +// TODO try to actually instantiate and serialize in a test. + +#[derive(Clone, Serialize, Deserialize)] +pub struct ContainerEvents2 +where + EVT: EventValueType, +{ + tss: VecDeque, + vals: VecDeque, +} diff --git a/crates/items_2/src/binning/test.rs b/crates/items_2/src/binning/test.rs new file mode 100644 index 0000000..378e0e6 --- /dev/null +++ b/crates/items_2/src/binning/test.rs @@ -0,0 +1,2 @@ +use super::___; +use netpod::log::*; diff --git a/crates/items_2/src/binning/timeweight.rs b/crates/items_2/src/binning/timeweight.rs new file mode 100644 index 0000000..0b44961 --- /dev/null +++ b/crates/items_2/src/binning/timeweight.rs @@ -0,0 +1,14 @@ +pub mod timeweight_bins; +pub mod timeweight_events; + +use super::___; +use netpod::log::*; + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +#[allow(unused)] +macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_bins.rs b/crates/items_2/src/binning/timeweight/timeweight_bins.rs new file mode 100644 index 0000000..320924c --- /dev/null +++ b/crates/items_2/src/binning/timeweight/timeweight_bins.rs @@ -0,0 +1,5 @@ +use super::___; +use netpod::log::*; + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } diff --git a/crates/items_2/src/binning/timeweight/timeweight_events.rs b/crates/items_2/src/binning/timeweight/timeweight_events.rs new file mode 100644 index 0000000..fc777ae --- /dev/null +++ b/crates/items_2/src/binning/timeweight/timeweight_events.rs @@ -0,0 +1,31 @@ +use super::super::container_events::EventValueType; +use super::___; +use futures_util::Stream; +use netpod::log::*; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[allow(unused)] +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + +pub struct BinnedEventsTimeweight +where + EVT: EventValueType, +{ + _evt: PhantomData, +} + +impl BinnedEventsTimeweight where EVT: EventValueType {} + +pub struct BinnedEventsTimeweightStream {} + +impl Stream for BinnedEventsTimeweightStream { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + todo!() + } +} diff --git a/crates/items_2/src/binsdim0.rs b/crates/items_2/src/binsdim0.rs index d66deb6..d0d408e 100644 --- a/crates/items_2/src/binsdim0.rs +++ b/crates/items_2/src/binsdim0.rs @@ -52,10 +52,7 @@ use std::mem; use std::ops::Range; #[allow(unused)] -macro_rules! trace44 { - ($($arg:tt)*) => (); - ($($arg:tt)*) => (eprintln!($($arg)*)); -} +macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } // TODO make members private #[derive(Clone, PartialEq, Serialize, Deserialize)] @@ -82,6 +79,9 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { let self_name = any::type_name::(); + // if true { + // return fmt::Display::fmt(self, fmt); + // } if true { write!( fmt, @@ -163,7 +163,7 @@ where let self_name = any::type_name::(); write!( fmt, - "{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {} }}", + "{self_name} {{ len: {:?}, ts1s: {}, ts2s {}, counts {}, mins {}, maxs {}, avgs {}, lsts {} }}", self.len(), VecPreview::new(&self.ts1s), VecPreview::new(&self.ts2s), @@ -171,6 +171,7 @@ where VecPreview::new(&self.mins), VecPreview::new(&self.maxs), VecPreview::new(&self.avgs), + VecPreview::new(&self.lsts), ) } } @@ -296,6 +297,7 @@ impl Resettable for BinsDim0 { self.mins.clear(); self.maxs.clear(); self.avgs.clear(); + self.lsts.clear(); } } @@ -327,23 +329,27 @@ items_0::impl_range_overlap_info_bins!(BinsDim0); impl AppendEmptyBin for BinsDim0 { fn append_empty_bin(&mut self, ts1: u64, ts2: u64) { + error!("AppendEmptyBin::append_empty_bin should not get used"); self.ts1s.push_back(ts1); self.ts2s.push_back(ts2); self.cnts.push_back(0); self.mins.push_back(NTY::zero_b()); self.maxs.push_back(NTY::zero_b()); self.avgs.push_back(0.); + self.lsts.push_back(NTY::zero_b()); } } impl AppendAllFrom for BinsDim0 { fn append_all_from(&mut self, src: &mut Self) { + error!("AppendAllFrom::append_all_from should not get used"); self.ts1s.extend(src.ts1s.drain(..)); self.ts2s.extend(src.ts2s.drain(..)); self.cnts.extend(src.cnts.drain(..)); self.mins.extend(src.mins.drain(..)); self.maxs.extend(src.maxs.drain(..)); self.avgs.extend(src.avgs.drain(..)); + self.lsts.extend(src.lsts.drain(..)); } } @@ -396,6 +402,7 @@ where min: STY, max: STY, avg: f64, + lst: STY, filled_up_to: TsNano, last_seen_avg: f32, } @@ -404,12 +411,15 @@ impl BinsDim0TimeBinnerTy where STY: ScalarOps, { + pub fn type_name() -> &'static str { + any::type_name::() + } + pub fn new(binrange: BinnedRange, do_time_weight: bool, emit_empty_bins: bool) -> Self { // let ts1now = TsNano::from_ns(binrange.bin_off * binrange.bin_len.ns()); // let ts2 = ts1.add_dt_nano(binrange.bin_len.to_dt_nano()); - let ts1now = TsNano::from_ns(binrange.full_range().beg()); + let ts1now = TsNano::from_ns(binrange.nano_beg().ns()); let ts2now = ts1now.add_dt_nano(binrange.bin_len.to_dt_nano()); - let buf = ::Output::empty(); Self { ts1now, ts2now, @@ -422,6 +432,7 @@ where min: STY::zero_b(), max: STY::zero_b(), avg: 0., + lst: STY::zero_b(), filled_up_to: ts1now, last_seen_avg: 0., } @@ -444,8 +455,9 @@ where type Output = BinsDim0; fn ingest(&mut self, item: &mut Self::Input) { + trace_ingest!("<{} as TimeBinnerTy>::ingest {:?}", Self::type_name(), item); let mut count_before = 0; - for (((((&ts1, &ts2), &cnt), min), max), &avg) in item + for ((((((&ts1, &ts2), &cnt), min), max), &avg), lst) in item .ts1s .iter() .zip(&item.ts2s) @@ -453,9 +465,18 @@ where .zip(&item.mins) .zip(&item.maxs) .zip(&item.avgs) + .zip(&item.lsts) { if ts1 < self.ts1now.ns() { + if ts2 > self.ts1now.ns() { + error!("{} bad input grid mismatch", Self::type_name()); + continue; + } // warn!("encountered bin from time before {} {}", ts1, self.ts1now.ns()); + trace_ingest!("{} input bin before {}", Self::type_name(), TsNano::from_ns(ts1)); + self.min = min.clone(); + self.max = max.clone(); + self.lst = lst.clone(); count_before += 1; continue; } else { @@ -543,7 +564,7 @@ where if self.do_time_weight { let f = (self.ts2now.ns() - self.filled_up_to.ns()) as f64 / (self.ts2now.ns() - self.ts1now.ns()) as f64; - self.avg += self.last_seen_avg as f64 * f; + self.avg += self.lst.as_prim_f32_b() as f64 * f; self.filled_up_to = self.ts2now; } else { panic!("TODO non-time-weighted binning to be impl"); @@ -563,6 +584,7 @@ where self.out.mins.push_back(self.min.clone()); self.out.maxs.push_back(self.max.clone()); self.out.avgs.push_back(self.avg as f32); + self.out.lsts.push_back(self.lst.clone()); self.reset_agg(); } } @@ -823,6 +845,7 @@ impl CollectorType for BinsDim0Collector { vals.mins.append(&mut src.mins); vals.maxs.append(&mut src.maxs); vals.avgs.append(&mut src.avgs); + vals.lsts.append(&mut src.lsts); } fn set_range_complete(&mut self) { @@ -1377,6 +1400,7 @@ impl TimeBinned for BinsDim0 { dst.mins.extend(self.mins.drain(range.clone())); dst.maxs.extend(self.maxs.drain(range.clone())); dst.avgs.extend(self.avgs.drain(range.clone())); + dst.lsts.extend(self.lsts.drain(range.clone())); Ok(()) } else { let type_name = any::type_name::(); diff --git a/crates/items_2/src/channelevents.rs b/crates/items_2/src/channelevents.rs index c563cf1..8ff50c0 100644 --- a/crates/items_2/src/channelevents.rs +++ b/crates/items_2/src/channelevents.rs @@ -34,6 +34,9 @@ use std::fmt; use std::time::Duration; use std::time::SystemTime; +#[allow(unused)] +macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } + // TODO maybe rename to ChannelStatus? #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum ConnStatus { @@ -1062,26 +1065,21 @@ impl TimeBinnerTy for ChannelEventsTimeBinner { type Output = Box; fn ingest(&mut self, item: &mut Self::Input) { - trace!("{} INGEST {:?}", Self::type_name(), item); + trace_ingest!("{} INGEST {:?}", Self::type_name(), item); match item { ChannelEvents::Events(item) => { - if self.binner.is_none() { - let binner = item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins); - self.binner = Some(binner); - } - match self.binner.as_mut() { - Some(binner) => binner.ingest(item.as_time_binnable_mut()), - None => { - error!("ingest without active binner item {item:?}"); - () - } - } + let binner = self.binner.get_or_insert_with(|| { + item.time_binner_new(self.binrange.clone(), self.do_time_weight, self.emit_empty_bins) + }); + binner.ingest(item.as_time_binnable_mut()) } ChannelEvents::Status(item) => { warn!("TODO consider channel status in time binning {item:?}"); } } + trace_ingest!("{} INGEST RETURN {:?}", Self::type_name(), item); } + fn bins_ready_count(&self) -> usize { match &self.binner { Some(binner) => binner.bins_ready_count(), diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index 8c1d58d..75b9d3d 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -566,6 +566,7 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { } fn common_ingest_unweight_range(&mut self, item: &Self::Input, r: core::ops::Range) { + panic!("TODO common_ingest_unweight_range"); for (&ts, val) in item.tss.range(r.clone()).zip(item.values.range(r)) { self.apply_event_unweight(val.clone()); self.count += 1; @@ -574,7 +575,7 @@ impl TimeAggregatorCommonV0Trait for EventsDim0Aggregator { } fn common_ingest_one_before(&mut self, item: &Self::Input, j: usize) { - //trace_ingest!("{self_name} ingest {:6} {:20} {:10?} BEFORE", i1, ts, val); + trace_ingest!("{} common_ingest_one_before {:?} {:?}", Self::type_name(), j, item,); self.apply_min_max_lst(item.values[j].clone()); self.last_ts = item.tss[j]; } @@ -676,7 +677,7 @@ impl EventsDim0Aggregator { } fn ingest_unweight(&mut self, item: &::Input) { - TimeAggregatorCommonV0Func::ingest_time_weight(self, item) + TimeAggregatorCommonV0Func::ingest_unweight(self, item) } fn ingest_time_weight(&mut self, item: &::Input) { @@ -1174,6 +1175,7 @@ impl TimeBinnerCommonV0Trait for EventsDim0TimeBinner { impl TimeBinner for EventsDim0TimeBinner { fn ingest(&mut self, item: &mut dyn TimeBinnable) { + trace_ingest!("{}::ingest {:?}", Self::type_name(), item); TimeBinnerCommonV0Func::ingest(self, item) } diff --git a/crates/items_2/src/items_2.rs b/crates/items_2/src/items_2.rs index 6fa1566..5978d32 100644 --- a/crates/items_2/src/items_2.rs +++ b/crates/items_2/src/items_2.rs @@ -1,5 +1,6 @@ pub mod accounting; pub mod binnedcollected; +pub mod binning; pub mod binsdim0; pub mod binsxbindim0; pub mod channelevents; diff --git a/crates/items_2/src/timebin.rs b/crates/items_2/src/timebin.rs index c224541..ecb5e6b 100644 --- a/crates/items_2/src/timebin.rs +++ b/crates/items_2/src/timebin.rs @@ -11,31 +11,13 @@ use std::collections::VecDeque; use std::ops::Range; #[allow(unused)] -macro_rules! trace_ingest { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace_init { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace_ingest_item { - ($($arg:tt)*) => { - if false { - info!($($arg)*); - } - }; -} +macro_rules! trace_ingest { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } #[allow(unused)] -macro_rules! trace2 { - ($($arg:tt)*) => { - if false { - trace!($($arg)*); - } - }; -} +macro_rules! trace_ingest_detail { ($($arg:tt)*) => ( if true { trace!($($arg)*); }) } pub trait TimeBinnerCommonV0Trait { type Input: RangeOverlapInfo + 'static; @@ -59,7 +41,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = B::type_name(); - trace_ingest_item!( + trace_ingest!( "TimeBinner for {} ingest common_range_current {:?} item {:?}", self_name, binner.common_range_current(), @@ -74,19 +56,20 @@ impl TimeBinnerCommonV0Func { // Or consume the input data. loop { while item.starts_after(B::common_range_current(binner)) { - trace_ingest_item!("{self_name} ignore item and cycle starts_after"); + trace_ingest!("{self_name} ignore item and cycle starts_after"); TimeBinnerCommonV0Func::cycle(binner); if !B::common_has_more_range(binner) { debug!("{self_name} no more bin in edges after starts_after"); return; } } - if item.ends_before(B::common_range_current(binner)) { - trace_ingest_item!("{self_name} ignore item ends_before"); - return; - } else { + // if item.ends_before(B::common_range_current(binner)) { + // trace_ingest_item!("{self_name} ignore item ends_before"); + // return; + // } + { if !B::common_has_more_range(binner) { - trace_ingest_item!("{self_name} no more bin in edges"); + trace_ingest!("{self_name} no more bin in edges"); return; } else { if let Some(item) = item @@ -95,10 +78,10 @@ impl TimeBinnerCommonV0Func { .downcast_mut::() { // TODO collect statistics associated with this request: - trace_ingest_item!("{self_name} FEED THE ITEM..."); + trace_ingest!("{self_name} FEED THE ITEM..."); TimeBinnerCommonV0Func::agg_ingest(binner, item); if item.ends_after(B::common_range_current(binner)) { - trace_ingest_item!( + trace_ingest!( "{self_name} FED ITEM, ENDS AFTER agg-range {:?}", B::common_range_current(binner) ); @@ -107,14 +90,18 @@ impl TimeBinnerCommonV0Func { warn!("{self_name} no more bin in edges after ingest and cycle"); return; } else { - trace_ingest_item!("{self_name} item fed, cycled, continue"); + trace_ingest!("{self_name} item fed, cycled, continue"); } } else { - trace_ingest_item!("{self_name} item fed, break"); + trace_ingest!("{self_name} item fed, break"); break; } } else { - error!("{self_name}::ingest unexpected item type"); + error!( + "{self_name}::ingest unexpected item type {} expected {}", + item.type_name(), + any::type_name::() + ); }; } } @@ -134,7 +121,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = B::type_name(); - trace_ingest_item!("{self_name}::push_in_progress push_empty {push_empty}"); + trace_ingest!("{self_name}::push_in_progress push_empty {push_empty}"); // TODO expand should be derived from AggKind. Is it still required after all? // TODO here, the expand means that agg will assume that the current value is kept constant during // the rest of the time range. @@ -158,7 +145,7 @@ impl TimeBinnerCommonV0Func { B: TimeBinnerCommonV0Trait, { let self_name = any::type_name::(); - trace_ingest_item!("{self_name}::cycle"); + trace_ingest!("{self_name}::cycle"); // TODO refactor this logic. let n = TimeBinnerCommonV0Trait::common_bins_ready_count(binner); TimeBinnerCommonV0Func::push_in_progress(binner, true); diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index c117c69..97760d0 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -2475,6 +2475,18 @@ impl BinnedRange { pub fn nano_end(&self) -> TsNano { self.bin_len.times(self.bin_off + self.bin_cnt) } + + pub fn one_before_bin(&self) -> Self { + Self { + bin_len: self.bin_len, + bin_off: self.bin_off - 1, + bin_cnt: self.bin_cnt + 1, + } + } + + pub fn bin_len_dt_ms(&self) -> DtMs { + self.bin_len.to_dt_ms() + } } impl BinnedRange @@ -2501,7 +2513,7 @@ where }*/ let beg = self.bin_len.times(self.bin_off).as_u64(); let end = self.bin_len.times(self.bin_off + self.bin_cnt).as_u64(); - debug!("TODO make generic for pulse"); + panic!("TODO make generic for pulse"); NanoRange { beg, end } } diff --git a/crates/scyllaconn/src/schema.rs b/crates/scyllaconn/src/schema.rs index feeebe8..3ce099e 100644 --- a/crates/scyllaconn/src/schema.rs +++ b/crates/scyllaconn/src/schema.rs @@ -7,9 +7,17 @@ use scylla::Session as ScySession; #[derive(Debug, ThisError)] #[cstm(name = "ScyllaSchema")] pub enum Error { - Scylla, + Scylla(#[from] scylla::transport::errors::QueryError), } pub async fn schema(rt: RetentionTime, scyco: &ScyllaConfig, scy: &ScySession) -> Result<(), Error> { - todo!() + let table = "binned_scalar_f32"; + let cql = format!( + concat!("alter table {}.{}{}", " add lst float"), + &scyco.keyspace, + rt.table_prefix(), + table + ); + let _ = scy.query(cql, ()).await; + Ok(()) } diff --git a/crates/streams/src/timebin/gapfill.rs b/crates/streams/src/timebin/gapfill.rs index a7e4e89..12d6324 100644 --- a/crates/streams/src/timebin/gapfill.rs +++ b/crates/streams/src/timebin/gapfill.rs @@ -46,7 +46,10 @@ macro_rules! trace_handle { ($($arg:tt)*) => ( if true { trace!($($arg)*); } ) } #[cstm(name = "BinCachedGapFill")] pub enum Error { CacheReader(#[from] super::cached::reader::Error), - GapFromFiner, + #[error("GapFromFiner({0}, {1}, {2})")] + GapFromFiner(TsNano, TsNano, DtMs), + #[error("MissingBegFromFiner({0}, {1}, {2})")] + MissingBegFromFiner(TsNano, TsNano, DtMs), #[error("InputBeforeRange({0}, {1})")] InputBeforeRange(NanoRange, BinnedRange), SfDatabufferNotSupported, @@ -153,13 +156,24 @@ impl GapFill { for (&ts1, &ts2) in bins.ts1s.iter().zip(&bins.ts2s) { if let Some(last) = self.last_bin_ts2 { if ts1 != last.ns() { - return Err(Error::GapFromFiner); + return Err(Error::GapFromFiner( + TsNano::from_ns(ts1), + last, + self.range.bin_len_dt_ms(), + )); } + } else if ts1 != self.range.nano_beg().ns() { + return Err(Error::MissingBegFromFiner( + TsNano::from_ns(ts1), + self.range.nano_beg(), + self.range.bin_len_dt_ms(), + )); } self.last_bin_ts2 = Some(TsNano::from_ns(ts2)); } if bins.len() != 0 { - bins.clone().drain_into(&mut self.bins_for_cache_write, 0..bins.len()); + let mut bins2 = bins.clone(); + bins2.drain_into(&mut self.bins_for_cache_write, 0..bins2.len()); } if self.cache_usage.is_cache_write() { self.cache_write_intermediate()?; @@ -239,6 +253,7 @@ impl GapFill { self.range.bin_len.to_dt_ms() ); let range_finer = BinnedRange::from_nano_range(range, bin_len_finer); + let range_finer_one_before_bin = range_finer.one_before_bin(); let inp_finer = GapFill::new( self.dbgname.clone(), self.ch_conf.clone(), @@ -248,7 +263,7 @@ impl GapFill { self.log_level.clone(), self.ctx.clone(), self.series, - range_finer.clone(), + range_finer_one_before_bin, self.do_time_weight, self.bin_len_layers.clone(), self.cache_read_provider.clone(),