diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 301a1ae..c3e3ed6 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -122,10 +122,11 @@ impl PreBinnedQuery { let params = netpod::query_params(req.uri.query()); let ret = PreBinnedQuery { patch: PreBinnedPatchCoord { - range: NanoRange { - beg: params.get("beg").ok_or(Error::with_msg("missing beg"))?.parse()?, - end: params.get("end").ok_or(Error::with_msg("missing end"))?.parse()?, + spec: PreBinnedPatchGridSpec { + patch_t_len: params.get("patch_t_len").ok_or(Error::with_msg("missing patch_t_len"))?.parse()?, + bin_t_len: params.get("bin_t_len").ok_or(Error::with_msg("missing bin_t_len"))?.parse()?, }, + ix: params.get("patch_ix").ok_or(Error::with_msg("missing patch_ix"))?.parse()?, }, agg_kind: AggKind::DimXBins1, channel: Channel { @@ -195,7 +196,7 @@ pub struct PreBinnedValueStream { agg_kind: AggKind, node_config: Arc, open_check_local_file: Option> + Send>>>, - fut2: Option<()>, + fut2: Option> + Send>>>, } impl PreBinnedValueStream { @@ -215,17 +216,37 @@ impl PreBinnedValueStream { fn try_setup_fetch_prebinned_higher_res(&mut self) { info!("try to find a next better granularity for {:?}", self.patch_coord); - let g = self.patch_coord.range.end - self.patch_coord.range.beg; - match PreBinnedPatchRange::covering_range(self.patch_coord.range.clone(), 2, 0) { + let g = self.patch_coord.spec.bin_t_len; + let range = NanoRange { + beg: self.patch_coord.patch_beg(), + end: self.patch_coord.patch_end(), + }; + match PreBinnedPatchRange::covering_range(range, 2, 0) { Some(range) => { let h = range.grid_spec.bin_t_len; assert!(g / h > 1); assert!(g / h < 20); assert!(g % h == 0); info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range); + let bin_size = range.grid_spec.bin_t_len; + let channel = self.channel.clone(); + let agg_kind = self.agg_kind.clone(); + let node_config = self.node_config.clone(); + let mut patch_it = PreBinnedPatchIterator::from_range(range); + let s = futures_util::stream::iter(patch_it) + .map(move |coord| { + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) + }) + .flatten() + .map(move |k| { + info!("ITEM from sub res bin_size {} {:?}", bin_size, k); + k + }); + self.fut2 = Some(Box::pin(s)); } None => { - info!("NO BETTER GRAN FOUND FOR g {}", g); + error!("TODO NO BETTER GRAN FOUND FOR g {}", g); + todo!(); } } } @@ -240,7 +261,7 @@ impl Stream for PreBinnedValueStream { use Poll::*; 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { - todo!() + fut.poll_next_unpin(cx) } else if let Some(fut) = self.open_check_local_file.as_mut() { match fut.poll_unpin(cx) { @@ -295,12 +316,12 @@ impl PreBinnedValueFetchedStream { pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); let node = &node_config.cluster.nodes[nodeix as usize]; + warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); let uri: hyper::Uri = format!( - "http://{}:{}/api/1/prebinned?beg={}&end={}&channel_backend={}&channel_name={}&agg_kind={:?}", + "http://{}:{}/api/1/prebinned?{}&channel_backend={}&channel_name={}&agg_kind={:?}", node.host, node.port, - patch_coord.range.beg, - patch_coord.range.end, + patch_coord.to_url_params_strings(), channel.backend, channel.name, agg_kind, diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 2937803..e28649a 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -339,13 +339,26 @@ impl PreBinnedPatchRange { #[derive(Clone, Debug)] pub struct PreBinnedPatchCoord { - pub range: NanoRange, + pub spec: PreBinnedPatchGridSpec, + pub ix: u64, } impl PreBinnedPatchCoord { - pub fn bs(&self) -> u64 { - self.range.end - self.range.beg + pub fn bin_t_len(&self) -> u64 { + self.spec.bin_t_len + } + + pub fn patch_t_len(&self) -> u64 { + self.spec.patch_t_len + } + + pub fn patch_beg(&self) -> u64 { + self.spec.patch_t_len * self.ix + } + + pub fn patch_end(&self) -> u64 { + self.spec.patch_t_len * (self.ix + 1) } } @@ -377,10 +390,8 @@ impl Iterator for PreBinnedPatchIterator { } else { let ret = Self::Item { - range: NanoRange { - beg: (self.range.offset + self.ix) * self.range.grid_spec.patch_t_len, - end: (self.range.offset + self.ix + 1) * self.range.grid_spec.patch_t_len, - }, + spec: self.range.grid_spec.clone(), + ix: self.range.offset + self.ix, }; self.ix += 1; Some(ret)