diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 9c799e2..92d4df3 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -7,7 +7,8 @@ use bytes::{BufMut, Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; -use netpod::BinSpecDimT; +use netpod::timeunits::SEC; +use netpod::{BinSpecDimT, NanoRange}; use netpod::{Node, ScalarType}; use serde::{Deserialize, Serialize}; use std::mem::size_of; @@ -523,6 +524,44 @@ impl MinMaxAvgScalarBinBatch { } } +pub enum Fits { + Empty, + Lower, + Greater, + Inside, + PartlyLower, + PartlyGreater, + PartlyLowerAndGreater, +} + +pub trait FitsInside { + fn fits_inside(&self, range: NanoRange) -> Fits; +} + +impl FitsInside for MinMaxAvgScalarBinBatch { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.ts1s.is_empty() { + Fits::Empty + } else { + let t1 = *self.ts1s.first().unwrap(); + let t2 = *self.ts2s.last().unwrap(); + if t2 <= range.beg { + Fits::Lower + } else if t1 >= range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + impl MinMaxAvgScalarBinBatch { #[allow(dead_code)] fn old_serialized(&self) -> Bytes { @@ -557,9 +596,10 @@ impl std::fmt::Debug for MinMaxAvgScalarBinBatch { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { write!( fmt, - "MinMaxAvgScalarBinBatch count {} ts1s {:?} counts {:?} avgs {:?}", + "MinMaxAvgScalarBinBatch count {} ts1s {:?} ts2s {:?} counts {:?} avgs {:?}", self.ts1s.len(), - self.ts1s, + self.ts1s.iter().map(|k| k / SEC).collect::>(), + self.ts2s.iter().map(|k| k / SEC).collect::>(), self.counts, self.avgs ) diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 95006ad..b445d94 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -5,8 +5,9 @@ use futures_core::Stream; use futures_util::StreamExt; #[allow(unused_imports)] use netpod::log::*; -use netpod::RetStreamExt; use netpod::{AggKind, Channel, NodeConfig, PreBinnedPatchIterator}; +use netpod::{NanoRange, RetStreamExt}; +use std::future::ready; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -19,26 +20,42 @@ impl BinnedStream { pub fn new( patch_it: PreBinnedPatchIterator, channel: Channel, + range: NanoRange, agg_kind: AggKind, node_config: Arc, ) -> Self { + let patches: Vec<_> = patch_it.collect(); warn!("BinnedStream will open a PreBinnedValueStream"); - let inp = futures_util::stream::iter(patch_it) + for p in &patches { + info!("BinnedStream -> patch {:?}", p); + } + let inp = futures_util::stream::iter(patches.into_iter()) .map(move |coord| { PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) }) .flatten() .only_first_error() - .map(|k| { - match k { - Ok(ref k) => { - trace!("BinnedStream got good item {:?}", k); - } - Err(_) => { - error!("\n\n----------------------------------------------------- BinnedStream got error") - } + .filter_map({ + let range = range.clone(); + move |k: Result| { + let g = match k { + Ok(k) => { + use super::agg::{Fits, FitsInside}; + //info!("BinnedStream got good item {:?}", k); + match k.fits_inside(range.clone()) { + Fits::Inside => Some(Ok(k)), + _ => None, + } + } + Err(e) => { + error!( + "\n\n----------------------------------------------------- BinnedStream got error" + ); + Some(Err(e)) + } + }; + ready(g) } - k }); Self { inp: Box::pin(inp) } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 75912b1..397195b 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,6 +1,6 @@ -use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch, MinMaxAvgScalarEventBatch}; +use crate::agg::MinMaxAvgScalarEventBatch; use crate::binnedstream::BinnedStream; -use crate::cache::pbvfs::{PreBinnedHttpFrame, PreBinnedValueFetchedStream}; +use crate::cache::pbv::PreBinnedValueByteStream; use crate::frame::makeframe::make_frame; use crate::merge::MergedMinMaxAvgScalarStream; use crate::raw::EventsQuery; @@ -8,14 +8,14 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use err::Error; use futures_core::Stream; -use futures_util::{pin_mut, FutureExt, StreamExt, TryStreamExt}; +use futures_util::{pin_mut, StreamExt}; use hyper::Response; use netpod::{ - AggKind, BinSpecDimT, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, - PreBinnedPatchRange, ToNanos, + AggKind, Channel, Cluster, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, + ToNanos, }; use serde::{Deserialize, Serialize}; -use std::future::{ready, Future}; +use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -24,6 +24,7 @@ use tokio::io::{AsyncRead, ReadBuf}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; +pub mod pbv; pub mod pbvfs; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -67,8 +68,6 @@ pub async fn binned_bytes_for_http( node_config: Arc, query: &Query, ) -> Result { - let agg_kind = AggKind::DimXBins1; - let channel_config = super::channelconfig::read_local_config(&query.channel, node_config.clone()).await?; let entry; { @@ -104,7 +103,8 @@ pub async fn binned_bytes_for_http( let s1 = BinnedStream::new( PreBinnedPatchIterator::from_range(spec), query.channel.clone(), - agg_kind, + query.range.clone(), + query.agg_kind.clone(), node_config.clone(), ); let ret = BinnedBytesForHttpStream::new(s1); @@ -205,221 +205,6 @@ pub fn pre_binned_bytes_for_http( Ok(ret) } -pub struct PreBinnedValueByteStream { - inp: PreBinnedValueStream, - errored: bool, - completed: bool, -} - -impl PreBinnedValueByteStream { - pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { - Self { - inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), - errored: false, - completed: false, - } - } -} - -impl Stream for PreBinnedValueByteStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - if self.completed { - panic!("poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - match self.inp.poll_next_unpin(cx) { - Ready(Some(item)) => match make_frame::(&item) { - Ok(buf) => Ready(Some(Ok(buf.freeze()))), - Err(e) => { - self.errored = true; - Ready(Some(Err(e.into()))) - } - }, - Ready(None) => Ready(None), - Pending => Pending, - } - } -} - -pub struct PreBinnedValueStream { - patch_coord: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - node_config: Arc, - open_check_local_file: Option> + Send>>>, - fut2: Option> + Send>>>, -} - -impl PreBinnedValueStream { - pub fn new( - patch_coord: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - node_config: Arc, - ) -> Self { - let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); - assert!(node_ix == node_config.node.id); - Self { - patch_coord, - channel, - agg_kind, - node_config, - open_check_local_file: None, - fut2: None, - } - } - - fn try_setup_fetch_prebinned_higher_res(&mut self) { - info!("try to find a next better granularity for {:?}", self.patch_coord); - let g = self.patch_coord.bin_t_len(); - let range = NanoRange { - beg: self.patch_coord.patch_beg(), - end: self.patch_coord.patch_end(), - }; - match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { - Some(range) => { - let h = range.grid_spec.bin_t_len(); - info!( - "FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", - g, - h, - g / h, - g % h, - range - ); - assert!(g / h > 1); - assert!(g / h < 20); - assert!(g % h == 0); - let bin_size = range.grid_spec.bin_t_len(); - let channel = self.channel.clone(); - let agg_kind = self.agg_kind.clone(); - let node_config = self.node_config.clone(); - let patch_it = PreBinnedPatchIterator::from_range(range); - let s = futures_util::stream::iter(patch_it) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) - }) - .flatten() - .map(move |k| { - error!("NOTE NOTE NOTE try_setup_fetch_prebinned_higher_res ITEM from sub res bin_size {} {:?}", bin_size, k); - k - }); - self.fut2 = Some(Box::pin(s)); - } - None => { - warn!("no better resolution found for g {}", g); - let evq = EventsQuery { - channel: self.channel.clone(), - range: NanoRange { - beg: self.patch_coord.patch_beg(), - end: self.patch_coord.patch_end(), - }, - agg_kind: self.agg_kind.clone(), - }; - assert!(self.patch_coord.patch_t_len() % self.patch_coord.bin_t_len() == 0); - let count = self.patch_coord.patch_t_len() / self.patch_coord.bin_t_len(); - let spec = BinSpecDimT { - bs: self.patch_coord.bin_t_len(), - ts1: self.patch_coord.patch_beg(), - ts2: self.patch_coord.patch_end(), - count, - }; - let evq = Arc::new(evq); - error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning"); - let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); - let s2 = s1 - .map(|k| { - if k.is_err() { - error!("\n\n\n.................. try_setup_fetch_prebinned_higher_res got ERROR"); - } else { - trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes"); - } - k - }) - .into_binned_t(spec) - .map_ok({ - let mut a = MinMaxAvgScalarBinBatch::empty(); - move |k| { - a.push_single(&k); - if a.len() > 0 { - let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty()); - Some(z) - } else { - None - } - } - }) - .filter_map(|k| { - let g = match k { - Ok(Some(k)) => Some(Ok(k)), - Ok(None) => None, - Err(e) => Some(Err(e)), - }; - ready(g) - }) - .take_while({ - let mut run = true; - move |k| { - if !run { - ready(false) - } else { - if k.is_err() { - run = false; - } - ready(true) - } - } - }); - self.fut2 = Some(Box::pin(s2)); - } - } - } -} - -impl Stream for PreBinnedValueStream { - // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - 'outer: loop { - break if let Some(fut) = self.fut2.as_mut() { - fut.poll_next_unpin(cx) - } else if let Some(fut) = self.open_check_local_file.as_mut() { - match fut.poll_unpin(cx) { - Ready(Ok(_file)) => err::todoval(), - Ready(Err(e)) => match e.kind() { - std::io::ErrorKind::NotFound => { - error!("TODO LOCAL CACHE FILE NOT FOUND"); - self.try_setup_fetch_prebinned_higher_res(); - continue 'outer; - } - _ => { - error!("File I/O error: {:?}", e); - Ready(Some(Err(e.into()))) - } - }, - Pending => Pending, - } - } else { - #[allow(unused_imports)] - use std::os::unix::fs::OpenOptionsExt; - let mut opts = std::fs::OpenOptions::new(); - opts.read(true); - let fut = async { tokio::fs::OpenOptions::from(opts).open("/DOESNOTEXIST").await }; - self.open_check_local_file = Some(Box::pin(fut)); - continue 'outer; - }; - } - } -} - pub struct HttpBodyAsAsyncRead { inp: Response, left: Bytes, diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs new file mode 100644 index 0000000..22335c8 --- /dev/null +++ b/disk/src/cache/pbv.rs @@ -0,0 +1,236 @@ +use crate::agg::{IntoBinnedT, MinMaxAvgScalarBinBatch}; +use crate::cache::pbvfs::{PreBinnedHttpFrame, PreBinnedValueFetchedStream}; +use crate::cache::{node_ix_for_patch, MergedFromRemotes}; +use crate::frame::makeframe::make_frame; +use crate::raw::EventsQuery; +use bytes::Bytes; +use err::Error; +use futures_core::Stream; +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use netpod::log::*; +use netpod::{ + AggKind, BinSpecDimT, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, + PreBinnedPatchRange, +}; +use std::future::{ready, Future}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +pub struct PreBinnedValueByteStream { + inp: PreBinnedValueStream, + errored: bool, + completed: bool, +} + +impl PreBinnedValueByteStream { + pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + Self { + inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), + errored: false, + completed: false, + } + } +} + +impl Stream for PreBinnedValueByteStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.completed { + panic!("poll_next on completed"); + } + if self.errored { + self.completed = true; + return Ready(None); + } + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match make_frame::(&item) { + Ok(buf) => Ready(Some(Ok(buf.freeze()))), + Err(e) => { + self.errored = true; + Ready(Some(Err(e.into()))) + } + }, + Ready(None) => Ready(None), + Pending => Pending, + } + } +} + +pub struct PreBinnedValueStream { + patch_coord: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + open_check_local_file: Option> + Send>>>, + fut2: Option> + Send>>>, +} + +impl PreBinnedValueStream { + pub fn new( + patch_coord: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, + ) -> Self { + let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); + assert!(node_ix == node_config.node.id); + Self { + patch_coord, + channel, + agg_kind, + node_config, + open_check_local_file: None, + fut2: None, + } + } + + fn try_setup_fetch_prebinned_higher_res(&mut self) { + info!("try to find a next better granularity for {:?}", self.patch_coord); + let g = self.patch_coord.bin_t_len(); + let range = NanoRange { + beg: self.patch_coord.patch_beg(), + end: self.patch_coord.patch_end(), + }; + match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { + Some(range) => { + let h = range.grid_spec.bin_t_len(); + info!( + "FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", + g, + h, + g / h, + g % h, + range + ); + assert!(g / h > 1); + assert!(g / h < 20); + assert!(g % h == 0); + let bin_size = range.grid_spec.bin_t_len(); + let channel = self.channel.clone(); + let agg_kind = self.agg_kind.clone(); + let node_config = self.node_config.clone(); + let patch_it = PreBinnedPatchIterator::from_range(range); + let s = futures_util::stream::iter(patch_it) + .map(move |coord| { + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) + }) + .flatten() + .map(move |k| { + error!("NOTE NOTE NOTE try_setup_fetch_prebinned_higher_res ITEM from sub res bin_size {} {:?}", bin_size, k); + k + }); + self.fut2 = Some(Box::pin(s)); + } + None => { + warn!("no better resolution found for g {}", g); + let evq = EventsQuery { + channel: self.channel.clone(), + range: NanoRange { + beg: self.patch_coord.patch_beg(), + end: self.patch_coord.patch_end(), + }, + agg_kind: self.agg_kind.clone(), + }; + assert!(self.patch_coord.patch_t_len() % self.patch_coord.bin_t_len() == 0); + error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning"); + let count = self.patch_coord.patch_t_len() / self.patch_coord.bin_t_len(); + // TODO use a ctor, remove from BinSpecDimT the redundant variable. + // If given a timestamp range, verify that it divides. + // For ranges, use a range type. + let spec = BinSpecDimT { + bs: self.patch_coord.bin_t_len(), + ts1: self.patch_coord.patch_beg(), + ts2: self.patch_coord.patch_end(), + count, + }; + let evq = Arc::new(evq); + let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone()); + let s2 = s1 + .map(|k| { + if k.is_err() { + error!("\n\n\n.................. try_setup_fetch_prebinned_higher_res got ERROR"); + } else { + trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes"); + } + k + }) + .into_binned_t(spec) + .map_ok({ + let mut a = MinMaxAvgScalarBinBatch::empty(); + move |k| { + a.push_single(&k); + if a.len() > 0 { + let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty()); + Some(z) + } else { + None + } + } + }) + .filter_map(|k| { + let g = match k { + Ok(Some(k)) => Some(Ok(k)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }; + ready(g) + }) + .take_while({ + let mut run = true; + move |k| { + if !run { + ready(false) + } else { + if k.is_err() { + run = false; + } + ready(true) + } + } + }); + self.fut2 = Some(Box::pin(s2)); + } + } + } +} + +impl Stream for PreBinnedValueStream { + // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + 'outer: loop { + break if let Some(fut) = self.fut2.as_mut() { + fut.poll_next_unpin(cx) + } else if let Some(fut) = self.open_check_local_file.as_mut() { + match fut.poll_unpin(cx) { + Ready(Ok(_file)) => err::todoval(), + Ready(Err(e)) => match e.kind() { + std::io::ErrorKind::NotFound => { + error!("TODO LOCAL CACHE FILE NOT FOUND"); + self.try_setup_fetch_prebinned_higher_res(); + continue 'outer; + } + _ => { + error!("File I/O error: {:?}", e); + Ready(Some(Err(e.into()))) + } + }, + Pending => Pending, + } + } else { + #[allow(unused_imports)] + use std::os::unix::fs::OpenOptionsExt; + let mut opts = std::fs::OpenOptions::new(); + opts.read(true); + let fut = async { tokio::fs::OpenOptions::from(opts).open("/DOESNOTEXIST").await }; + self.open_check_local_file = Some(Box::pin(fut)); + continue 'outer; + }; + } + } +} diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index c137057..65279b7 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -40,6 +40,7 @@ impl PreBinnedValueFetchedStream { ) .parse() .unwrap(); + info!("PreBinnedValueFetchedStream open uri {}", uri); Self { uri, resfut: None, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index c711b6d..e6d31ba 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -10,16 +10,6 @@ use tokio::io::AsyncWriteExt; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; -#[test] -fn test_gen_test_data() { - let res = taskrun::run(async { - gen_test_data().await?; - Ok(()) - }); - info!("{:?}", res); - res.unwrap(); -} - pub async fn gen_test_data() -> Result<(), Error> { let data_base_path = PathBuf::from("../tmpdata"); let ksprefix = String::from("ks"); diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index d9f1c45..196d4a2 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -153,7 +153,12 @@ async fn raw_conn_handler_inner_try( .into_binned_x_bins_1(); while let Some(item) = s1.next().await { if let Ok(k) = &item { - trace!("???????????????? emit item ts0: {:?}", k.tss.first()); + trace!( + "emit items {} {:?} {:?}", + k.tss.len(), + k.tss.first().map(|k| k / 1000000000), + k.tss.last().map(|k| k / 1000000000) + ); } match make_frame::(&item) { Ok(buf) => match netout.write(&buf).await { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index ba97c10..5dbcd80 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -236,8 +236,8 @@ async fn binned(req: Request, node_config: Arc) -> Result, node_config: Arc) -> Result, Error> { let (head, _body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; - let span1 = span!(Level::INFO, "httpret::prebinned", bin_t_len = 0); - span1.record("bin_t_len", &q.patch.bin_t_len()); + let desc = format!("pre-b-{}", q.patch.bin_t_len() / 1000000000); + let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { trace!("prebinned"); let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7ed2ac4..0e3ad1e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -230,7 +230,7 @@ impl BinSpecDimT { } } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct PreBinnedPatchGridSpec { bin_t_len: u64, } @@ -281,6 +281,17 @@ impl PreBinnedPatchGridSpec { } } +impl std::fmt::Debug for PreBinnedPatchGridSpec { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + fmt, + "PreBinnedPatchGridSpec {{ bin_t_len: {:?}, patch_t_len(): {:?} }}", + self.bin_t_len / SEC, + self.patch_t_len() / SEC, + ) + } +} + const BIN_T_LEN_OPTIONS: [u64; 6] = [SEC * 10, MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4]; const PATCH_T_LEN_OPTIONS: [u64; 6] = [MIN * 10, HOUR, HOUR * 4, DAY, DAY * 4, DAY * 12]; @@ -345,17 +356,18 @@ impl PreBinnedPatchRange { let t = BIN_T_LEN_OPTIONS[i1]; //info!("look at threshold {} bs {}", t, bs); if t <= bs { - let bs = t; - let ts1 = range.beg / bs * bs; - let _ts2 = (range.end + bs - 1) / bs * bs; - let count = range.delta() / bs; - let offset = ts1 / bs; + let bin_t_len = t; + let grid_spec = PreBinnedPatchGridSpec { bin_t_len }; + let pl = grid_spec.patch_t_len(); + let ts1 = range.beg / pl * pl; + let ts2 = (range.end + pl - 1) / pl * pl; + let count = (ts2 - ts1) / pl; + let offset = ts1 / pl; break Some(Self { - grid_spec: PreBinnedPatchGridSpec { bin_t_len: bs }, + grid_spec, count, offset, }); - } else { } } } diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index e476674..d341255 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use tracing::{debug, error, info, trace, warn}; fn test_cluster() -> Cluster { - let nodes = (0..1) + let nodes = (0..13) .into_iter() .map(|id| { let node = Node { @@ -22,7 +22,7 @@ fn test_cluster() -> Cluster { port_raw: 8360 + id as u16 + 100, data_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), - split: 0, + split: id, }; Arc::new(node) }) @@ -131,6 +131,16 @@ async fn get_cached_0_inner() -> Result<(), Error> { Ok(()) } +#[test] +fn test_gen_test_data() { + let res = taskrun::run(async { + disk::gen::gen_test_data().await?; + Ok(()) + }); + info!("{:?}", res); + res.unwrap(); +} + #[test] fn bufs() { use bytes::{Buf, BufMut}; diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index edd628c..f07ce57 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -56,7 +56,7 @@ pub fn tracing_init() { .with_thread_names(true) //.with_max_level(tracing::Level::INFO) .with_env_filter(tracing_subscriber::EnvFilter::new( - "info,retrieval=trace,retrieval::test=trace,tokio_postgres=info", + "info,retrieval=trace,retrieval::test=trace,disk::raw::conn=trace,tokio_postgres=info", )) .init(); }