From 50202c33c286cfc4307a04b69da318626d5c495e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 15 Apr 2021 11:42:54 +0200 Subject: [PATCH] Find the next higher res binning --- disk/src/cache.rs | 87 +++++++++++++++++++++++++++++------------------ netpod/src/lib.rs | 44 ++++++++++++++---------- 2 files changed, 80 insertions(+), 51 deletions(-) diff --git a/disk/src/cache.rs b/disk/src/cache.rs index bc75749..301a1ae 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -8,7 +8,7 @@ use futures_core::Stream; use futures_util::{StreamExt, FutureExt, pin_mut, TryFutureExt}; use bytes::{Bytes, BytesMut, BufMut}; use chrono::{DateTime, Utc}; -use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig}; +use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchRange, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel, NodeConfig, PreBinnedPatchGridSpec}; use crate::agg::MinMaxAvgScalarBinBatch; use http::uri::Scheme; use tiny_keccak::Hasher; @@ -54,7 +54,7 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res // TODO // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. - let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); + let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count, 0); match grid { Some(spec) => { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); @@ -170,7 +170,6 @@ impl Stream for PreBinnedValueByteStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - error!("PreBinnedValueByteStream poll_next"); use Poll::*; match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { @@ -196,6 +195,7 @@ pub struct PreBinnedValueStream { agg_kind: AggKind, node_config: Arc, open_check_local_file: Option> + Send>>>, + fut2: Option<()>, } impl PreBinnedValueStream { @@ -209,6 +209,24 @@ impl PreBinnedValueStream { agg_kind, node_config, open_check_local_file: None, + fut2: None, + } + } + + fn try_setup_fetch_prebinned_higher_res(&mut self) { + info!("try to find a next better granularity for {:?}", self.patch_coord); + let g = self.patch_coord.range.end - self.patch_coord.range.beg; + match PreBinnedPatchRange::covering_range(self.patch_coord.range.clone(), 2, 0) { + Some(range) => { + let h = range.grid_spec.bin_t_len; + assert!(g / h > 1); + assert!(g / h < 20); + assert!(g % h == 0); + info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); + } + None => { + info!("NO BETTER GRAN FOUND FOR g {}", g); + } } } @@ -221,39 +239,42 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; 'outer: loop { - break match self.open_check_local_file.as_mut() { - None => { - use std::os::unix::fs::OpenOptionsExt; - let mut opts = std::fs::OpenOptions::new(); - opts.read(true); - let fut = async { - tokio::fs::OpenOptions::from(opts) - .open("/DOESNOTEXIST").await - }; - self.open_check_local_file = Some(Box::pin(fut)); - continue 'outer; - } - Some(fut) => { - match fut.poll_unpin(cx) { - Ready(Ok(file)) => { - todo!("IMPLEMENT READ FROM LOCAL CACHE"); - Pending - } - Ready(Err(e)) => { - match e.kind() { - std::io::ErrorKind::NotFound => { - warn!("TODO LOCAL CACHE FILE NOT FOUND"); - } - _ => { - error!("File I/O error: {:?}", e); - } - } - Ready(Some(Err(e.into()))) - } - Pending => Pending, + break if let Some(fut) = self.fut2.as_mut() { + todo!() + } + else if let Some(fut) = self.open_check_local_file.as_mut() { + match fut.poll_unpin(cx) { + Ready(Ok(file)) => { + todo!("IMPLEMENT READ FROM LOCAL CACHE"); + Pending } + Ready(Err(e)) => { + match e.kind() { + std::io::ErrorKind::NotFound => { + warn!("TODO LOCAL CACHE FILE NOT FOUND"); + self.try_setup_fetch_prebinned_higher_res(); + continue 'outer; + } + _ => { + error!("File I/O error: {:?}", e); + Ready(Some(Err(e.into()))) + } + } + } + Pending => Pending, } } + else { + use std::os::unix::fs::OpenOptionsExt; + let mut opts = std::fs::OpenOptions::new(); + opts.read(true); + let fut = async { + tokio::fs::OpenOptions::from(opts) + .open("/DOESNOTEXIST").await + }; + self.open_check_local_file = Some(Box::pin(fut)); + continue 'outer; + } } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index a74b399..2937803 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -276,7 +276,7 @@ pub struct PreBinnedPatchRange { impl PreBinnedPatchRange { - pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { + pub fn covering_range(range: NanoRange, min_bin_count: u64, finer: u64) -> Option { use timeunits::*; assert!(min_bin_count >= 1); assert!(min_bin_count <= 2000); @@ -292,6 +292,7 @@ impl PreBinnedPatchRange { DAY, DAY * 4, ]; + let mut found_count = 0; let mut i1 = thresholds.len(); loop { if i1 <= 0 { @@ -302,25 +303,32 @@ impl PreBinnedPatchRange { let t = thresholds[i1]; //info!("look at threshold {} bs {}", t, bs); if t <= bs { - let bs = t; - let ts1 = range.beg / bs * bs; - let ts2 = (range.end + bs - 1) / bs * bs; - let count = range.delta() / bs; - let patch_t_len = if i1 >= thresholds.len() - 1 { - bs * 8 + found_count += 1; + if found_count > finer { + let bs = t; + let ts1 = range.beg / bs * bs; + let ts2 = (range.end + bs - 1) / bs * bs; + let count = range.delta() / bs; + let patch_t_len = if i1 >= thresholds.len() - 1 { + bs * 8 + } + else { + thresholds[i1 + 1] * 8 + }; + let offset = ts1 / bs; + break Some(Self { + grid_spec: PreBinnedPatchGridSpec { + bin_t_len: bs, + patch_t_len, + }, + count, + offset, + }); } else { - thresholds[i1 + 1] * 8 - }; - let offset = ts1 / bs; - break Some(Self { - grid_spec: PreBinnedPatchGridSpec { - bin_t_len: bs, - patch_t_len, - }, - count, - offset, - }); + } + } + else { } } }