WIP prebinned

This commit is contained in:
Dominik Werder
2024-12-09 19:22:32 +01:00
parent 309c995ad7
commit 1af83c30d8
2 changed files with 48 additions and 17 deletions

View File

@@ -24,6 +24,7 @@ autoerr::create_error_v1!(
ChannelSend,
ChannelRecv,
Scylla(String),
PrebinnedPartitioningInvalid,
},
);
@@ -33,12 +34,35 @@ pub type BinsReadResErr = streams::timebin::cached::reader::Error;
pub type BinsReadRes = Result<Option<BinsBoxed>, BinsReadResErr>;
pub type BinsReadFutBoxed = Pin<Box<dyn Future<Output = BinsReadRes> + Send>>;
pub fn off_max() -> u64 {
1000
pub enum PrebinnedPartitioning {
Sec1,
Sec10,
Min1,
Min10,
Hour1,
}
pub fn part_len(bin_len: DtMs) -> DtMs {
DtMs::from_ms_u64(bin_len.ms() * off_max())
impl PrebinnedPartitioning {
pub fn msp_div(&self) -> DtMs {
use PrebinnedPartitioning::*;
match self {
Sec1 => DtMs::from_ms_u64(1000 * 60 * 10),
Sec10 => DtMs::from_ms_u64(1000 * 60 * 60 * 2),
Min1 => DtMs::from_ms_u64(1000 * 60 * 60 * 8),
Min10 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4),
Hour1 => DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28),
}
}
}
impl TryFrom<DtMs> for PrebinnedPartitioning {
type Error = Error;
fn try_from(value: DtMs) -> Result<Self, Self::Error> {
match value {
_ => Err(Error::PrebinnedPartitioningInvalid),
}
}
}
pub struct EventsReading {
@@ -179,18 +203,26 @@ impl Stream for CachedReader {
}
} else {
if self.ts1next < self.range.nano_end() {
let div = part_len(self.bin_len).ns();
let msp = self.ts1next.ns() / div;
let off = (self.ts1next.ns() - div * msp) / self.bin_len.ns();
let off2 = (self.range.nano_end().ns() - div * msp) / self.bin_len.ns();
let off2 = off2.min(off_max());
self.ts1next = TsNano::from_ns(self.bin_len.ns() * off2 + div * msp);
let offs = off as u32..off2 as u32;
let fut = self
.cache_read_provider
.read(self.series, self.bin_len, msp, offs);
self.reading = Some(Box::pin(fut));
continue;
match PrebinnedPartitioning::try_from(self.range.bin_len_dt_ms()) {
Ok(partt) => {
let binlen = self.bin_len.ns();
let div = partt.msp_div().ns();
let msp = self.ts1next.ns() / div;
let off1 = (self.ts1next.ns() - div * msp) / binlen;
let off2 = (self.range.nano_end().ns() - div * msp) / binlen;
self.ts1next = TsNano::from_ns(binlen * off2 + div * msp);
let offs = off1 as u32..off2 as u32;
let fut =
self.cache_read_provider
.read(self.series, self.bin_len, msp, offs);
self.reading = Some(Box::pin(fut));
continue;
}
Err(_) => {
error!("bad prebinned partitioning {:?}", self.range);
Ready(None)
}
}
} else {
Ready(None)
}

View File

@@ -264,7 +264,6 @@ async fn timebinned_stream(
cache_read_provider: Arc<dyn CacheReadProvider>,
events_read_provider: Arc<dyn EventsReadProvider>,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<Box<dyn CollectableDyn>>> + Send>>, Error> {
use netpod::query::CacheUsage;
let do_time_weight = true;
let bin_len_layers = if let Some(subgrids) = query.subgrids() {
subgrids