Refactor and add CacheUsage to query params

This commit is contained in:
Dominik Werder
2021-05-03 11:47:48 +02:00
parent a47ffc843d
commit c5b5986a28
7 changed files with 218 additions and 146 deletions

122
disk/src/cache/pbv.rs vendored
View File

@@ -1,6 +1,6 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::cache::pbvfs::{PreBinnedFrame, PreBinnedItem, PreBinnedValueFetchedStream};
use crate::cache::{node_ix_for_patch, MergedFromRemotes};
use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery};
use crate::frame::makeframe::make_frame;
use crate::raw::EventsQuery;
use bytes::Bytes;
@@ -9,11 +9,8 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use netpod::log::*;
use netpod::streamext::SCC;
use netpod::{
AggKind, BinnedRange, Channel, NanoRange, NodeConfigCached, PreBinnedPatchCoord, PreBinnedPatchIterator,
PreBinnedPatchRange,
};
use std::future::Future;
use netpod::{BinnedRange, NodeConfigCached, PreBinnedPatchIterator, PreBinnedPatchRange};
use std::future::{ready, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -24,12 +21,10 @@ pub struct PreBinnedValueByteStreamInner {
}
pub fn pre_binned_value_byte_stream_new(
patch: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
query: &PreBinnedQuery,
node_config: &NodeConfigCached,
) -> PreBinnedValueByteStream {
let s1 = PreBinnedValueStream::new(patch, channel, agg_kind, node_config);
let s1 = PreBinnedValueStream::new(query.clone(), node_config);
let s2 = PreBinnedValueByteStreamInner { inp: s1 };
SCC::new(s2)
}
@@ -51,9 +46,7 @@ impl Stream for PreBinnedValueByteStreamInner {
}
pub struct PreBinnedValueStream {
patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
query: PreBinnedQuery,
node_config: NodeConfigCached,
open_check_local_file: Option<Pin<Box<dyn Future<Output = Result<tokio::fs::File, std::io::Error>> + Send>>>,
fut2: Option<Pin<Box<dyn Stream<Item = PreBinnedFrame> + Send>>>,
@@ -62,18 +55,11 @@ pub struct PreBinnedValueStream {
}
impl PreBinnedValueStream {
pub fn new(
patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: &NodeConfigCached,
) -> Self {
pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self {
// TODO check that we are the correct node.
let _node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.node_config.cluster);
let _node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster);
Self {
patch_coord,
channel,
agg_kind,
query,
node_config: node_config.clone(),
open_check_local_file: None,
fut2: None,
@@ -83,33 +69,53 @@ impl PreBinnedValueStream {
}
fn try_setup_fetch_prebinned_higher_res(&mut self) {
info!("try to find a next better granularity for {:?}", self.patch_coord);
let g = self.patch_coord.bin_t_len();
let range = NanoRange {
beg: self.patch_coord.patch_beg(),
end: self.patch_coord.patch_end(),
};
match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) {
info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch);
let g = self.query.patch.bin_t_len();
let range = self.query.patch.patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) {
Some(range) => {
let h = range.grid_spec.bin_t_len();
info!(
"FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}",
"try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}",
g,
h,
g / h,
g % h,
range
range,
);
assert!(g / h > 1);
assert!(g / h < 200);
assert!(g % h == 0);
let channel = self.channel.clone();
let agg_kind = self.agg_kind.clone();
if g / h <= 1 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g / h > 200 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
if g % h != 0 {
error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h);
return;
}
let node_config = self.node_config.clone();
let patch_it = PreBinnedPatchIterator::from_range(range);
let s = futures_util::stream::iter(patch_it)
.map(move |coord| {
PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config)
.map({
let q2 = self.query.clone();
move |patch| {
let query = PreBinnedQuery {
patch,
channel: q2.channel.clone(),
agg_kind: q2.agg_kind.clone(),
cache_usage: q2.cache_usage.clone(),
};
PreBinnedValueFetchedStream::new(&query, &node_config)
}
})
.filter_map(|k| match k {
Ok(k) => ready(Some(k)),
Err(e) => {
error!("{:?}", e);
ready(None)
}
})
.flatten();
self.fut2 = Some(Box::pin(s));
@@ -117,32 +123,26 @@ impl PreBinnedValueStream {
None => {
warn!("no better resolution found for g {}", g);
let evq = EventsQuery {
channel: self.channel.clone(),
range: NanoRange {
beg: self.patch_coord.patch_beg(),
end: self.patch_coord.patch_end(),
},
agg_kind: self.agg_kind.clone(),
channel: self.query.channel.clone(),
range: self.query.patch.patch_range(),
agg_kind: self.query.agg_kind.clone(),
};
assert!(self.patch_coord.patch_t_len() % self.patch_coord.bin_t_len() == 0);
if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 {
error!(
"Patch length inconsistency {} {}",
self.query.patch.patch_t_len(),
self.query.patch.bin_t_len()
);
return;
}
error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning");
let count = self.patch_coord.patch_t_len() / self.patch_coord.bin_t_len();
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone());
let s2 = s1
.map(|k| {
if k.is_err() {
error!("\n\n\n.................. try_setup_fetch_prebinned_higher_res got ERROR");
} else {
trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes");
}
k
})
.into_binned_t(range)
.map(|k| match k {
Ok(k) => PreBinnedFrame(Ok(PreBinnedItem::Batch(k))),
Err(e) => PreBinnedFrame(Err(e)),
});
let s2 = s1.into_binned_t(range).map(|k| match k {
Ok(k) => PreBinnedFrame(Ok(PreBinnedItem::Batch(k))),
Err(e) => PreBinnedFrame(Err(e)),
});
self.fut2 = Some(Box::pin(s2));
}
}

View File

@@ -1,5 +1,5 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead};
use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery};
use crate::frame::inmem::InMemoryFrameAsyncReadStream;
use crate::frame::makeframe::decode_frame;
use err::Error;
@@ -7,7 +7,7 @@ use futures_core::Stream;
use futures_util::{pin_mut, FutureExt};
#[allow(unused_imports)]
use netpod::log::*;
use netpod::{AggKind, Channel, NodeConfigCached, PreBinnedPatchCoord};
use netpod::NodeConfigCached;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -21,35 +21,26 @@ pub struct PreBinnedValueFetchedStream {
}
impl PreBinnedValueFetchedStream {
pub fn new(
patch_coord: PreBinnedPatchCoord,
channel: Channel,
agg_kind: AggKind,
node_config: &NodeConfigCached,
) -> Self {
let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.node_config.cluster);
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result<Self, Error> {
let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster);
let node = &node_config.node_config.cluster.nodes[nodeix as usize];
warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?");
// TODO encapsulate uri creation, how to express aggregation kind?
let uri: hyper::Uri = format!(
"http://{}:{}/api/1/prebinned?{}&channel_backend={}&channel_name={}&agg_kind={:?}",
"http://{}:{}/api/1/prebinned?{}",
node.host,
node.port,
patch_coord.to_url_params_strings(),
channel.backend,
channel.name,
agg_kind,
query.make_query_string()
)
.parse()
.unwrap();
.parse()?;
info!("PreBinnedValueFetchedStream open uri {}", uri);
Self {
let ret = Self {
uri,
resfut: None,
res: None,
errored: false,
completed: false,
}
};
Ok(ret)
}
}