diff --git a/src/timebin/cached/reader.rs b/src/timebin/cached/reader.rs index 7b3bf7f..67fa5a4 100644 --- a/src/timebin/cached/reader.rs +++ b/src/timebin/cached/reader.rs @@ -24,6 +24,7 @@ autoerr::create_error_v1!( ChannelSend, ChannelRecv, Scylla(String), + PrebinnedPartitioningInvalid, }, ); @@ -33,12 +34,35 @@ pub type BinsReadResErr = streams::timebin::cached::reader::Error; pub type BinsReadRes = Result, BinsReadResErr>; pub type BinsReadFutBoxed = Pin + Send>>; -pub fn off_max() -> u64 { - 1000 +pub enum PrebinnedPartitioning { + Sec1, + Sec10, + Min1, + Min10, + Hour1, } -pub fn part_len(bin_len: DtMs) -> DtMs { - DtMs::from_ms_u64(bin_len.ms() * off_max()) +impl PrebinnedPartitioning { + pub fn msp_div(&self) -> DtMs { + use PrebinnedPartitioning::*; + match self { + Sec1 => DtMs::from_ms_u64(1000 * 60 * 10), + Sec10 => DtMs::from_ms_u64(1000 * 60 * 60 * 2), + Min1 => DtMs::from_ms_u64(1000 * 60 * 60 * 8), + Min10 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4), + Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28), + } + } +} + +impl TryFrom for PrebinnedPartitioning { + type Error = Error; + + fn try_from(value: DtMs) -> Result { + match value { + _ => Err(Error::PrebinnedPartitioningInvalid), + } + } } pub struct EventsReading { @@ -179,18 +203,26 @@ impl Stream for CachedReader { } } else { if self.ts1next < self.range.nano_end() { - let div = part_len(self.bin_len).ns(); - let msp = self.ts1next.ns() / div; - let off = (self.ts1next.ns() - div * msp) / self.bin_len.ns(); - let off2 = (self.range.nano_end().ns() - div * msp) / self.bin_len.ns(); - let off2 = off2.min(off_max()); - self.ts1next = TsNano::from_ns(self.bin_len.ns() * off2 + div * msp); - let offs = off as u32..off2 as u32; - let fut = self - .cache_read_provider - .read(self.series, self.bin_len, msp, offs); - self.reading = Some(Box::pin(fut)); - continue; + match PrebinnedPartitioning::try_from(self.range.bin_len_dt_ms()) { + Ok(partt) => { + let binlen = self.bin_len.ns(); + let div = partt.msp_div().ns(); + let msp = self.ts1next.ns() / div; + let off1 = (self.ts1next.ns() - div * msp) / binlen; + let off2 = (self.range.nano_end().ns() - div * msp) / binlen; + self.ts1next = TsNano::from_ns(binlen * off2 + div * msp); + let offs = off1 as u32..off2 as u32; + let fut = + self.cache_read_provider + .read(self.series, self.bin_len, msp, offs); + self.reading = Some(Box::pin(fut)); + continue; + } + Err(_) => { + error!("bad prebinned partitioning {:?}", self.range); + Ready(None) + } + } } else { Ready(None) } diff --git a/src/timebinnedjson.rs b/src/timebinnedjson.rs index e27ed6f..4388792 100644 --- a/src/timebinnedjson.rs +++ b/src/timebinnedjson.rs @@ -264,7 +264,6 @@ async fn timebinned_stream( cache_read_provider: Arc, events_read_provider: Arc, ) -> Result>> + Send>>, Error> { - use netpod::query::CacheUsage; let do_time_weight = true; let bin_len_layers = if let Some(subgrids) = query.subgrids() { subgrids