Binned service can now merge from remote if no matching pre-binned grid

This commit is contained in:
Dominik Werder
2021-05-06 15:07:31 +02:00
parent 2f4d2ccea9
commit f391eca970
9 changed files with 121 additions and 70 deletions

View File

@@ -118,7 +118,6 @@ where
return Ready(None);
} else {
self.range_complete_emitted = true;
warn!("IntoBinnedTDefaultStream /////////////////////////////////// emit now RangeComplete");
// TODO why can't I declare that type?
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {

View File

@@ -23,12 +23,14 @@ impl BinnedStream {
agg_kind: AggKind,
cache_usage: CacheUsage,
node_config: &NodeConfigCached,
) -> Self {
) -> Result<Self, Error> {
let patches: Vec<_> = patch_it.collect();
warn!("BinnedStream::new");
for p in &patches {
info!("BinnedStream::new patch {:?}", p);
let mut sp = String::new();
for (i, p) in patches.iter().enumerate() {
use std::fmt::Write;
write!(sp, " • patch {:2} {:?}\n", i, p)?;
}
info!("BinnedStream::new\n{}", sp);
use super::agg::binnedt::IntoBinnedT;
let inp = futures_util::stream::iter(patches.into_iter())
.map({
@@ -61,24 +63,18 @@ impl BinnedStream {
_ => None,
}
}
Ok(PreBinnedItem::RangeComplete) => {
info!("=================== BINNED STREAM OBSERVES RangeComplete ====================");
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete))
}
Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)),
Ok(PreBinnedItem::EventDataReadStats(stats)) => {
Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)))
}
Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))),
Err(e) => {
error!("observe error in stream {:?}", e);
Some(Err(e))
}
Err(e) => Some(Err(e)),
};
ready(g)
}
})
.into_binned_t(range);
Self { inp: Box::pin(inp) }
Ok(Self { inp: Box::pin(inp) })
}
}
@@ -90,3 +86,24 @@ impl Stream for BinnedStream {
self.inp.poll_next_unpin(cx)
}
}
pub struct BinnedStreamFromMerged {
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
}
impl BinnedStreamFromMerged {
pub fn new(
inp: Pin<Box<dyn Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Send>>,
) -> Result<Self, Error> {
Ok(Self { inp })
}
}
impl Stream for BinnedStreamFromMerged {
// TODO make this generic over all possible things
type Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inp.poll_next_unpin(cx)
}
}

View File

@@ -1,6 +1,7 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::binnedstream::BinnedStream;
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::binnedstream::{BinnedStream, BinnedStreamFromMerged};
use crate::cache::pbv::PreBinnedValueByteStream;
use crate::cache::pbvfs::PreBinnedItem;
use crate::channelconfig::{extract_matching_config_entry, read_local_config};
@@ -106,7 +107,6 @@ impl PreBinnedQuery {
channel: channel_from_params(&params)?,
cache_usage: cache_usage_from_params(&params)?,
};
info!("PreBinnedQuery::from_request {:?}", ret);
Ok(ret)
}
@@ -154,13 +154,15 @@ fn cache_usage_from_params(params: &BTreeMap<String, String>) -> Result<CacheUsa
Ok(ret)
}
type BinnedStreamBox = Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>;
pub async fn binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &BinnedQuery,
) -> Result<BinnedBytesForHttpStream, Error> {
) -> Result<BinnedStreamBox, Error> {
if query.channel.backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch we {} requested {}",
"backend mismatch node: {} requested: {}",
node_config.node.backend, query.channel.backend
));
return Err(err);
@@ -168,18 +170,18 @@ pub async fn binned_bytes_for_http(
let range = &query.range;
let channel_config = read_local_config(&query.channel, &node_config.node).await?;
let entry = extract_matching_config_entry(range, &channel_config);
info!("found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count)
.ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?;
info!("binned_bytes_for_http found config entry {:?}", entry);
let range = BinnedRange::covering_range(range.clone(), query.bin_count).ok_or(Error::with_msg(format!(
"binned_bytes_for_http BinnedRange::covering_range returned None"
)))?;
match PreBinnedPatchRange::covering_range(query.range.clone(), query.bin_count) {
Some(pre_range) => {
info!("Found pre_range: {:?}", pre_range);
info!("binned_bytes_for_http found pre_range: {:?}", pre_range);
if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range
);
error!("{}", msg);
return Err(Error::with_msg(msg));
}
let s1 = BinnedStream::new(
@@ -189,28 +191,50 @@ pub async fn binned_bytes_for_http(
query.agg_kind.clone(),
query.cache_usage.clone(),
node_config,
);
)?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(ret)
Ok(Box::pin(ret))
}
None => {
// TODO Merge raw data.
error!("binned_bytes_for_http TODO merge raw data");
todo!()
info!(
"binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}",
range
);
let evq = EventsQuery {
channel: query.channel.clone(),
range: query.range.clone(),
agg_kind: query.agg_kind.clone(),
};
// TODO do I need to set up more transformations or binning to deliver the requested data?
let s1 = MergedFromRemotes::new(evq, node_config.node_config.cluster.clone());
let s1 = s1.into_binned_t(range);
/*let s1 = s1.map(|k| {
use super::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem::*;
match k {
Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)),
Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete),
Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)),
Ok(Log(item)) => Ok(PreBinnedItem::Log(item)),
Err(e) => Err(e),
}
});*/
let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?;
let ret = BinnedBytesForHttpStream::new(s1);
Ok(Box::pin(ret))
}
}
}
pub type BinnedBytesForHttpStreamFrame = <BinnedStream as Stream>::Item;
pub struct BinnedBytesForHttpStream {
inp: BinnedStream,
pub struct BinnedBytesForHttpStream<S> {
inp: S,
errored: bool,
completed: bool,
}
impl BinnedBytesForHttpStream {
pub fn new(inp: BinnedStream) -> Self {
impl<S> BinnedBytesForHttpStream<S> {
pub fn new(inp: S) -> Self {
Self {
inp,
errored: false,
@@ -219,7 +243,10 @@ impl BinnedBytesForHttpStream {
}
}
impl Stream for BinnedBytesForHttpStream {
impl<S> Stream for BinnedBytesForHttpStream<S>
where
S: Stream<Item = Result<MinMaxAvgScalarBinBatchStreamItem, Error>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
@@ -255,10 +282,9 @@ pub fn pre_binned_bytes_for_http(
node_config: &NodeConfigCached,
query: &PreBinnedQuery,
) -> Result<PreBinnedValueByteStream, Error> {
info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node);
if query.channel.backend != node_config.node.backend {
let err = Error::with_msg(format!(
"backend mismatch we {} requested {}",
"backend mismatch node: {} requested: {}",
node_config.node.backend, query.channel.backend
));
return Err(err);
@@ -400,7 +426,6 @@ impl Stream for MergedFromRemotes {
pin_mut!(f);
match f.poll(cx) {
Ready(Ok(k)) => {
info!("MergedFromRemotes tcp_establish_futs ESTABLISHED INPUT {}", i1);
self.nodein[i1] = Some(k);
}
Ready(Err(e)) => {
@@ -523,9 +548,8 @@ pub async fn write_pb_cache_min_max_avg_scalar(
agg_kind: agg_kind.clone(),
};
let path = cfd.path(&node_config);
info!("Writing cache file\n{:?}\npath: {:?}", cfd, path);
let enc = serde_cbor::to_vec(&values)?;
info!("Encoded size: {}", enc.len());
info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path);
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
tokio::task::spawn_blocking({
let path = path.clone();
@@ -550,8 +574,7 @@ pub async fn write_pb_cache_min_max_avg_scalar(
pub async fn read_pbv(mut file: File) -> Result<PreBinnedItem, Error> {
let mut buf = vec![];
file.read_to_end(&mut buf).await?;
info!("Read cached file len {}", buf.len());
trace!("Read cached file len {}", buf.len());
let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?;
info!("Decoded cached file");
Ok(PreBinnedItem::Batch(dec))
}

13
disk/src/cache/pbv.rs vendored
View File

@@ -87,9 +87,8 @@ impl PreBinnedValueStream {
}
}
// TODO handle errors also here via return type.
fn setup_merged_from_remotes(&mut self) {
let g = self.query.patch.bin_t_len();
warn!("no better resolution found for g {}", g);
let evq = EventsQuery {
channel: self.query.channel.clone(),
range: self.query.patch.patch_range(),
@@ -107,8 +106,8 @@ impl PreBinnedValueStream {
let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len();
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone());
let s2 = s1.into_binned_t(range);
let s2 = s2.map(|k| {
let s1 = s1.into_binned_t(range);
let s1 = s1.map(|k| {
use MinMaxAvgScalarBinBatchStreamItem::*;
match k {
Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)),
@@ -118,13 +117,13 @@ impl PreBinnedValueStream {
Err(e) => Err(e),
}
});
self.fut2 = Some(Box::pin(s2));
self.fut2 = Some(Box::pin(s1));
}
fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {
let g = self.query.patch.bin_t_len();
let h = range.grid_spec.bin_t_len();
info!(
trace!(
"try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}",
g,
h,
@@ -172,7 +171,7 @@ impl PreBinnedValueStream {
}
fn try_setup_fetch_prebinned_higher_res(&mut self) {
info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch);
trace!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch);
let range = self.query.patch.patch_range();
match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) {
Some(range) => {

View File

@@ -6,6 +6,7 @@ use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
use futures_util::{pin_mut, FutureExt};
use http::StatusCode;
#[allow(unused_imports)]
use netpod::log::*;
use netpod::{EventDataReadStats, NodeConfigCached};
@@ -25,7 +26,6 @@ impl PreBinnedValueFetchedStream {
pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result<Self, Error> {
let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster);
let node = &node_config.node_config.cluster.nodes[nodeix as usize];
warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?");
let uri: hyper::Uri = format!(
"http://{}:{}/api/1/prebinned?{}",
node.host,
@@ -33,7 +33,6 @@ impl PreBinnedValueFetchedStream {
query.make_query_string()
)
.parse()?;
info!("PreBinnedValueFetchedStream open uri {}", uri);
let ret = Self {
uri,
resfut: None,
@@ -96,11 +95,23 @@ impl Stream for PreBinnedValueFetchedStream {
match resfut.poll_unpin(cx) {
Ready(res) => match res {
Ok(res) => {
info!("PreBinnedValueFetchedStream GOT result from SUB REQUEST: {:?}", res);
let s1 = HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameAsyncReadStream::new(s1);
self.res = Some(s2);
continue 'outer;
if res.status() == StatusCode::OK {
let s1 = HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameAsyncReadStream::new(s1);
self.res = Some(s2);
continue 'outer;
} else {
error!(
"PreBinnedValueFetchedStream got non-OK result from sub request: {:?}",
res
);
let e = Error::with_msg(format!(
"PreBinnedValueFetchedStream got non-OK result from sub request: {:?}",
res
));
self.errored = true;
Ready(Some(Err(e)))
}
}
Err(e) => {
error!("PreBinnedValueStream error in stream {:?}", e);
@@ -118,7 +129,6 @@ impl Stream for PreBinnedValueFetchedStream {
{
Ok(req) => {
let client = hyper::Client::new();
info!("PreBinnedValueFetchedStream START REQUEST FOR {:?}", req);
self.resfut = Some(client.request(req));
continue 'outer;
}

View File

@@ -248,7 +248,6 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result<Config,
.join("latest")
.join("00000_Config");
let buf = tokio::fs::read(&path).await?;
info!("try to parse config {} bytes", buf.len());
let config = parse_config(&buf)?;
Ok(config.1)
}