WIP
This commit is contained in:
@@ -122,10 +122,11 @@ impl PreBinnedQuery {
|
||||
let params = netpod::query_params(req.uri.query());
|
||||
let ret = PreBinnedQuery {
|
||||
patch: PreBinnedPatchCoord {
|
||||
range: NanoRange {
|
||||
beg: params.get("beg").ok_or(Error::with_msg("missing beg"))?.parse()?,
|
||||
end: params.get("end").ok_or(Error::with_msg("missing end"))?.parse()?,
|
||||
spec: PreBinnedPatchGridSpec {
|
||||
patch_t_len: params.get("patch_t_len").ok_or(Error::with_msg("missing patch_t_len"))?.parse()?,
|
||||
bin_t_len: params.get("bin_t_len").ok_or(Error::with_msg("missing bin_t_len"))?.parse()?,
|
||||
},
|
||||
ix: params.get("patch_ix").ok_or(Error::with_msg("missing patch_ix"))?.parse()?,
|
||||
},
|
||||
agg_kind: AggKind::DimXBins1,
|
||||
channel: Channel {
|
||||
@@ -195,7 +196,7 @@ pub struct PreBinnedValueStream {
|
||||
agg_kind: AggKind,
|
||||
node_config: Arc<NodeConfig>,
|
||||
open_check_local_file: Option<Pin<Box<dyn Future<Output=Result<tokio::fs::File, std::io::Error>> + Send>>>,
|
||||
fut2: Option<()>,
|
||||
fut2: Option<Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>>,
|
||||
}
|
||||
|
||||
impl PreBinnedValueStream {
|
||||
@@ -215,17 +216,37 @@ impl PreBinnedValueStream {
|
||||
|
||||
fn try_setup_fetch_prebinned_higher_res(&mut self) {
|
||||
info!("try to find a next better granularity for {:?}", self.patch_coord);
|
||||
let g = self.patch_coord.range.end - self.patch_coord.range.beg;
|
||||
match PreBinnedPatchRange::covering_range(self.patch_coord.range.clone(), 2, 0) {
|
||||
let g = self.patch_coord.spec.bin_t_len;
|
||||
let range = NanoRange {
|
||||
beg: self.patch_coord.patch_beg(),
|
||||
end: self.patch_coord.patch_end(),
|
||||
};
|
||||
match PreBinnedPatchRange::covering_range(range, 2, 0) {
|
||||
Some(range) => {
|
||||
let h = range.grid_spec.bin_t_len;
|
||||
assert!(g / h > 1);
|
||||
assert!(g / h < 20);
|
||||
assert!(g % h == 0);
|
||||
info!("FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", g, h, g/h, g%h, range);
|
||||
let bin_size = range.grid_spec.bin_t_len;
|
||||
let channel = self.channel.clone();
|
||||
let agg_kind = self.agg_kind.clone();
|
||||
let node_config = self.node_config.clone();
|
||||
let mut patch_it = PreBinnedPatchIterator::from_range(range);
|
||||
let s = futures_util::stream::iter(patch_it)
|
||||
.map(move |coord| {
|
||||
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone())
|
||||
})
|
||||
.flatten()
|
||||
.map(move |k| {
|
||||
info!("ITEM from sub res bin_size {} {:?}", bin_size, k);
|
||||
k
|
||||
});
|
||||
self.fut2 = Some(Box::pin(s));
|
||||
}
|
||||
None => {
|
||||
info!("NO BETTER GRAN FOUND FOR g {}", g);
|
||||
error!("TODO NO BETTER GRAN FOUND FOR g {}", g);
|
||||
todo!();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -240,7 +261,7 @@ impl Stream for PreBinnedValueStream {
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break if let Some(fut) = self.fut2.as_mut() {
|
||||
todo!()
|
||||
fut.poll_next_unpin(cx)
|
||||
}
|
||||
else if let Some(fut) = self.open_check_local_file.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
@@ -295,12 +316,12 @@ impl PreBinnedValueFetchedStream {
|
||||
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc<NodeConfig>) -> Self {
|
||||
let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster);
|
||||
let node = &node_config.cluster.nodes[nodeix as usize];
|
||||
warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?");
|
||||
let uri: hyper::Uri = format!(
|
||||
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel_backend={}&channel_name={}&agg_kind={:?}",
|
||||
"http://{}:{}/api/1/prebinned?{}&channel_backend={}&channel_name={}&agg_kind={:?}",
|
||||
node.host,
|
||||
node.port,
|
||||
patch_coord.range.beg,
|
||||
patch_coord.range.end,
|
||||
patch_coord.to_url_params_strings(),
|
||||
channel.backend,
|
||||
channel.name,
|
||||
agg_kind,
|
||||
|
||||
@@ -339,13 +339,26 @@ impl PreBinnedPatchRange {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PreBinnedPatchCoord {
|
||||
pub range: NanoRange,
|
||||
pub spec: PreBinnedPatchGridSpec,
|
||||
pub ix: u64,
|
||||
}
|
||||
|
||||
impl PreBinnedPatchCoord {
|
||||
|
||||
pub fn bs(&self) -> u64 {
|
||||
self.range.end - self.range.beg
|
||||
pub fn bin_t_len(&self) -> u64 {
|
||||
self.spec.bin_t_len
|
||||
}
|
||||
|
||||
pub fn patch_t_len(&self) -> u64 {
|
||||
self.spec.patch_t_len
|
||||
}
|
||||
|
||||
pub fn patch_beg(&self) -> u64 {
|
||||
self.spec.patch_t_len * self.ix
|
||||
}
|
||||
|
||||
pub fn patch_end(&self) -> u64 {
|
||||
self.spec.patch_t_len * (self.ix + 1)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -377,10 +390,8 @@ impl Iterator for PreBinnedPatchIterator {
|
||||
}
|
||||
else {
|
||||
let ret = Self::Item {
|
||||
range: NanoRange {
|
||||
beg: (self.range.offset + self.ix) * self.range.grid_spec.patch_t_len,
|
||||
end: (self.range.offset + self.ix + 1) * self.range.grid_spec.patch_t_len,
|
||||
},
|
||||
spec: self.range.grid_spec.clone(),
|
||||
ix: self.range.offset + self.ix,
|
||||
};
|
||||
self.ix += 1;
|
||||
Some(ret)
|
||||
|
||||
Reference in New Issue
Block a user