From f391eca97014beb9266333984429c756c5657f92 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 6 May 2021 15:07:31 +0200 Subject: [PATCH] Binned service can now merge from remote if no matching pre-binned grid --- disk/src/agg/binnedt.rs | 1 - disk/src/binnedstream.rs | 43 ++++++++++++++------- disk/src/cache.rs | 79 +++++++++++++++++++++++++-------------- disk/src/cache/pbv.rs | 13 +++---- disk/src/cache/pbvfs.rs | 26 +++++++++---- disk/src/channelconfig.rs | 1 - err/src/lib.rs | 6 +++ httpret/src/lib.rs | 8 ++-- retrieval/src/test.rs | 14 +++---- 9 files changed, 121 insertions(+), 70 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index ed0a5aa..c1f4fed 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -118,7 +118,6 @@ where return Ready(None); } else { self.range_complete_emitted = true; - warn!("IntoBinnedTDefaultStream /////////////////////////////////// emit now RangeComplete"); // TODO why can't I declare that type? //type TT = ::OutputValue; if let Some(item) = ::OutputValue::make_range_complete_item() { diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index a006862..57008a0 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -23,12 +23,14 @@ impl BinnedStream { agg_kind: AggKind, cache_usage: CacheUsage, node_config: &NodeConfigCached, - ) -> Self { + ) -> Result { let patches: Vec<_> = patch_it.collect(); - warn!("BinnedStream::new"); - for p in &patches { - info!("BinnedStream::new patch {:?}", p); + let mut sp = String::new(); + for (i, p) in patches.iter().enumerate() { + use std::fmt::Write; + write!(sp, " • patch {:2} {:?}\n", i, p)?; } + info!("BinnedStream::new\n{}", sp); use super::agg::binnedt::IntoBinnedT; let inp = futures_util::stream::iter(patches.into_iter()) .map({ @@ -61,24 +63,18 @@ impl BinnedStream { _ => None, } } - Ok(PreBinnedItem::RangeComplete) => { - info!("=================== BINNED STREAM OBSERVES RangeComplete ===================="); - Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)) - } + Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), Ok(PreBinnedItem::EventDataReadStats(stats)) => { Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), - Err(e) => { - error!("observe error in stream {:?}", e); - Some(Err(e)) - } + Err(e) => Some(Err(e)), }; ready(g) } }) .into_binned_t(range); - Self { inp: Box::pin(inp) } + Ok(Self { inp: Box::pin(inp) }) } } @@ -90,3 +86,24 @@ impl Stream for BinnedStream { self.inp.poll_next_unpin(cx) } } + +pub struct BinnedStreamFromMerged { + inp: Pin> + Send>>, +} + +impl BinnedStreamFromMerged { + pub fn new( + inp: Pin> + Send>>, + ) -> Result { + Ok(Self { inp }) + } +} + +impl Stream for BinnedStreamFromMerged { + // TODO make this generic over all possible things + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.inp.poll_next_unpin(cx) + } +} diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 6f31ed2..1d35af3 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,6 +1,7 @@ +use crate::agg::binnedt::IntoBinnedT; use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::binnedstream::BinnedStream; +use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; +use crate::binnedstream::{BinnedStream, BinnedStreamFromMerged}; use crate::cache::pbv::PreBinnedValueByteStream; use crate::cache::pbvfs::PreBinnedItem; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; @@ -106,7 +107,6 @@ impl PreBinnedQuery { channel: channel_from_params(¶ms)?, cache_usage: cache_usage_from_params(¶ms)?, }; - info!("PreBinnedQuery::from_request {:?}", ret); Ok(ret) } @@ -154,13 +154,15 @@ fn cache_usage_from_params(params: &BTreeMap) -> Result> + Send>>; + pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, query: &BinnedQuery, -) -> Result { +) -> Result { if query.channel.backend != node_config.node.backend { let err = Error::with_msg(format!( - "backend mismatch we {} requested {}", + "backend mismatch node: {} requested: {}", node_config.node.backend, query.channel.backend )); return Err(err); @@ -168,18 +170,18 @@ pub async fn binned_bytes_for_http( let range = &query.range; let channel_config = read_local_config(&query.channel, &node_config.node).await?; let entry = extract_matching_config_entry(range, &channel_config); - info!("found config entry {:?}", entry); - let range = BinnedRange::covering_range(range.clone(), query.bin_count) - .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?; + info!("binned_bytes_for_http found config entry {:?}", entry); + let range = BinnedRange::covering_range(range.clone(), query.bin_count).ok_or(Error::with_msg(format!( + "binned_bytes_for_http BinnedRange::covering_range returned None" + )))?; match PreBinnedPatchRange::covering_range(query.range.clone(), query.bin_count) { Some(pre_range) => { - info!("Found pre_range: {:?}", pre_range); + info!("binned_bytes_for_http found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( - "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", + "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", pre_range, range ); - error!("{}", msg); return Err(Error::with_msg(msg)); } let s1 = BinnedStream::new( @@ -189,28 +191,50 @@ pub async fn binned_bytes_for_http( query.agg_kind.clone(), query.cache_usage.clone(), node_config, - ); + )?; let ret = BinnedBytesForHttpStream::new(s1); - Ok(ret) + Ok(Box::pin(ret)) } None => { - // TODO Merge raw data. - error!("binned_bytes_for_http TODO merge raw data"); - todo!() + info!( + "binned_bytes_for_http no covering range for prebinned, merge from remotes instead {:?}", + range + ); + let evq = EventsQuery { + channel: query.channel.clone(), + range: query.range.clone(), + agg_kind: query.agg_kind.clone(), + }; + // TODO do I need to set up more transformations or binning to deliver the requested data? + let s1 = MergedFromRemotes::new(evq, node_config.node_config.cluster.clone()); + let s1 = s1.into_binned_t(range); + /*let s1 = s1.map(|k| { + use super::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem::*; + match k { + Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), + Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete), + Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)), + Ok(Log(item)) => Ok(PreBinnedItem::Log(item)), + Err(e) => Err(e), + } + });*/ + let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?; + let ret = BinnedBytesForHttpStream::new(s1); + Ok(Box::pin(ret)) } } } pub type BinnedBytesForHttpStreamFrame = ::Item; -pub struct BinnedBytesForHttpStream { - inp: BinnedStream, +pub struct BinnedBytesForHttpStream { + inp: S, errored: bool, completed: bool, } -impl BinnedBytesForHttpStream { - pub fn new(inp: BinnedStream) -> Self { +impl BinnedBytesForHttpStream { + pub fn new(inp: S) -> Self { Self { inp, errored: false, @@ -219,7 +243,10 @@ impl BinnedBytesForHttpStream { } } -impl Stream for BinnedBytesForHttpStream { +impl Stream for BinnedBytesForHttpStream +where + S: Stream> + Unpin, +{ type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { @@ -255,10 +282,9 @@ pub fn pre_binned_bytes_for_http( node_config: &NodeConfigCached, query: &PreBinnedQuery, ) -> Result { - info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); if query.channel.backend != node_config.node.backend { let err = Error::with_msg(format!( - "backend mismatch we {} requested {}", + "backend mismatch node: {} requested: {}", node_config.node.backend, query.channel.backend )); return Err(err); @@ -400,7 +426,6 @@ impl Stream for MergedFromRemotes { pin_mut!(f); match f.poll(cx) { Ready(Ok(k)) => { - info!("MergedFromRemotes tcp_establish_futs ESTABLISHED INPUT {}", i1); self.nodein[i1] = Some(k); } Ready(Err(e)) => { @@ -523,9 +548,8 @@ pub async fn write_pb_cache_min_max_avg_scalar( agg_kind: agg_kind.clone(), }; let path = cfd.path(&node_config); - info!("Writing cache file\n{:?}\npath: {:?}", cfd, path); let enc = serde_cbor::to_vec(&values)?; - info!("Encoded size: {}", enc.len()); + info!("Writing cache file size {}\n{:?}\npath: {:?}", enc.len(), cfd, path); tokio::fs::create_dir_all(path.parent().unwrap()).await?; tokio::task::spawn_blocking({ let path = path.clone(); @@ -550,8 +574,7 @@ pub async fn write_pb_cache_min_max_avg_scalar( pub async fn read_pbv(mut file: File) -> Result { let mut buf = vec![]; file.read_to_end(&mut buf).await?; - info!("Read cached file len {}", buf.len()); + trace!("Read cached file len {}", buf.len()); let dec: MinMaxAvgScalarBinBatch = serde_cbor::from_slice(&buf)?; - info!("Decoded cached file"); Ok(PreBinnedItem::Batch(dec)) } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 26d8df9..f1f7f4d 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -87,9 +87,8 @@ impl PreBinnedValueStream { } } + // TODO handle errors also here via return type. fn setup_merged_from_remotes(&mut self) { - let g = self.query.patch.bin_t_len(); - warn!("no better resolution found for g {}", g); let evq = EventsQuery { channel: self.query.channel.clone(), range: self.query.patch.patch_range(), @@ -107,8 +106,8 @@ impl PreBinnedValueStream { let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone()); - let s2 = s1.into_binned_t(range); - let s2 = s2.map(|k| { + let s1 = s1.into_binned_t(range); + let s1 = s1.map(|k| { use MinMaxAvgScalarBinBatchStreamItem::*; match k { Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), @@ -118,13 +117,13 @@ impl PreBinnedValueStream { Err(e) => Err(e), } }); - self.fut2 = Some(Box::pin(s2)); + self.fut2 = Some(Box::pin(s1)); } fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { let g = self.query.patch.bin_t_len(); let h = range.grid_spec.bin_t_len(); - info!( + trace!( "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", g, h, @@ -172,7 +171,7 @@ impl PreBinnedValueStream { } fn try_setup_fetch_prebinned_higher_res(&mut self) { - info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch); + trace!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch); let range = self.query.patch.patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { Some(range) => { diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index bf08d8d..9b1318a 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -6,6 +6,7 @@ use crate::streamlog::LogItem; use err::Error; use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; +use http::StatusCode; #[allow(unused_imports)] use netpod::log::*; use netpod::{EventDataReadStats, NodeConfigCached}; @@ -25,7 +26,6 @@ impl PreBinnedValueFetchedStream { pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result { let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; - warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); let uri: hyper::Uri = format!( "http://{}:{}/api/1/prebinned?{}", node.host, @@ -33,7 +33,6 @@ impl PreBinnedValueFetchedStream { query.make_query_string() ) .parse()?; - info!("PreBinnedValueFetchedStream open uri {}", uri); let ret = Self { uri, resfut: None, @@ -96,11 +95,23 @@ impl Stream for PreBinnedValueFetchedStream { match resfut.poll_unpin(cx) { Ready(res) => match res { Ok(res) => { - info!("PreBinnedValueFetchedStream GOT result from SUB REQUEST: {:?}", res); - let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1); - self.res = Some(s2); - continue 'outer; + if res.status() == StatusCode::OK { + let s1 = HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1); + self.res = Some(s2); + continue 'outer; + } else { + error!( + "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", + res + ); + let e = Error::with_msg(format!( + "PreBinnedValueFetchedStream got non-OK result from sub request: {:?}", + res + )); + self.errored = true; + Ready(Some(Err(e))) + } } Err(e) => { error!("PreBinnedValueStream error in stream {:?}", e); @@ -118,7 +129,6 @@ impl Stream for PreBinnedValueFetchedStream { { Ok(req) => { let client = hyper::Client::new(); - info!("PreBinnedValueFetchedStream START REQUEST FOR {:?}", req); self.resfut = Some(client.request(req)); continue 'outer; } diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 27d695f..65cbf24 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -248,7 +248,6 @@ pub async fn read_local_config(channel: &Channel, node: &Node) -> Result for Error { } } +impl From for Error { + fn from(k: std::fmt::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 44a4f35..eada06a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -24,8 +24,7 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { use std::str::FromStr; let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({ - move |conn| { - info!("new raw {:?}", conn); + move |_conn| { let node_config = node_config.clone(); async move { Ok::<_, Error>(service_fn({ @@ -224,7 +223,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { - error!("{:?}", e); + error!("fn binned: {:?}", e); response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? } }; @@ -237,7 +236,6 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result let desc = format!("pre-b-{}", q.patch().bin_t_len() / 1000000000); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { - trace!("prebinned"); let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped( s, @@ -248,7 +246,7 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result ), ))?, Err(e) => { - error!("{:?}", e); + error!("fn prebinned: {:?}", e); response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? } }; diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index cecabcf..a67bdbe 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -7,6 +7,7 @@ use disk::streamlog::Streamlog; use err::Error; use futures_util::StreamExt; use futures_util::TryStreamExt; +use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::{Cluster, Database, Node}; @@ -90,7 +91,7 @@ where let node0 = &cluster.nodes[0]; let beg_date: DateTime = beg_date.as_ref().parse()?; let end_date: DateTime = end_date.as_ref().parse()?; - let channel_backend = "back"; + let channel_backend = "testbackend"; let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( "http://{}:{}/api/1/binned?channel_backend={}&channel_name={}&bin_count={}&beg_date={}&end_date={}", @@ -102,23 +103,22 @@ where beg_date.format(date_fmt), end_date.format(date_fmt), ); - info!("URI {:?}", uri); + info!("get_binned_channel get {}", uri); let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) .body(Body::empty())?; - info!("Request for {:?}", req); let client = hyper::Client::new(); let res = client.request(req).await?; - info!("client response {:?}", res); - //let (res_head, mut res_body) = res.into_parts(); + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + } let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1); let res = consume_binned_response(s2).await?; let t2 = chrono::Utc::now(); let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - //let throughput = ntot / 1024 * 1000 / ms; - info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); + info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); Ok(()) }