diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 66bd0ec..782c5fb 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -1,5 +1,6 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; +use crate::cache::{CacheUsage, PreBinnedQuery}; use err::Error; use futures_core::Stream; use futures_util::StreamExt; @@ -20,6 +21,7 @@ impl BinnedStream { channel: Channel, range: BinnedRange, agg_kind: AggKind, + cache_usage: CacheUsage, node_config: &NodeConfigCached, ) -> Self { let patches: Vec<_> = patch_it.collect(); @@ -31,7 +33,17 @@ impl BinnedStream { let inp = futures_util::stream::iter(patches.into_iter()) .map({ let node_config = node_config.clone(); - move |coord| PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) + move |patch| { + let query = PreBinnedQuery::new(patch, channel.clone(), agg_kind.clone(), cache_usage.clone()); + PreBinnedValueFetchedStream::new(&query, &node_config) + } + }) + .filter_map(|k| match k { + Ok(k) => ready(Some(k)), + Err(e) => { + error!("{:?}", e); + ready(None) + } }) .flatten() .filter_map({ diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 8081795..0e8abd9 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -15,7 +15,7 @@ use netpod::{ AggKind, BinnedRange, Channel, Cluster, NanoRange, NodeConfigCached, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, }; -use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -27,54 +27,139 @@ use tracing::{debug, error, info, trace, warn}; pub mod pbv; pub mod pbvfs; -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Query { - range: NanoRange, - count: u64, - agg_kind: AggKind, - channel: Channel, +#[derive(Clone, Debug)] +pub enum CacheUsage { + Use, + Ignore, + Recreate, } -impl Query { +#[derive(Clone, Debug)] +pub struct BinnedQuery { + range: NanoRange, + bin_count: u64, + agg_kind: AggKind, + channel: Channel, + cache_usage: CacheUsage, +} + +impl BinnedQuery { pub fn from_request(req: &http::request::Parts) -> Result { let params = netpod::query_params(req.uri.query()); - let beg_date = params - .get("beg_date") - .ok_or_else(|| Error::with_msg("missing beg_date"))?; - let end_date = params - .get("end_date") - .ok_or_else(|| Error::with_msg("missing end_date"))?; - let ret = Query { + let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; + let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; + let ret = BinnedQuery { range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - count: params + bin_count: params .get("bin_count") - .ok_or_else(|| Error::with_msg("missing beg_date"))? + .ok_or(Error::with_msg("missing bin_count"))? .parse() - .unwrap(), + .map_err(|e| Error::with_msg(format!("can not parse bin_count {:?}", e)))?, agg_kind: AggKind::DimXBins1, - channel: Channel { - backend: params.get("channel_backend").unwrap().into(), - name: params.get("channel_name").unwrap().into(), - }, + channel: channel_from_params(¶ms)?, + cache_usage: cache_usage_from_params(¶ms)?, }; + info!("BinnedQuery::from_request {:?}", ret); Ok(ret) } } +#[derive(Clone, Debug)] +pub struct PreBinnedQuery { + patch: PreBinnedPatchCoord, + agg_kind: AggKind, + channel: Channel, + cache_usage: CacheUsage, +} + +impl PreBinnedQuery { + pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cache_usage: CacheUsage) -> Self { + Self { + patch, + agg_kind, + channel, + cache_usage, + } + } + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let patch_ix = params + .get("patch_ix") + .ok_or(Error::with_msg("missing patch_ix"))? + .parse()?; + let bin_t_len = params + .get("bin_t_len") + .ok_or(Error::with_msg("missing bin_t_len"))? + .parse()?; + let ret = PreBinnedQuery { + patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix), + agg_kind: AggKind::DimXBins1, + channel: channel_from_params(¶ms)?, + cache_usage: cache_usage_from_params(¶ms)?, + }; + info!("PreBinnedQuery::from_request {:?}", ret); + Ok(ret) + } + + pub fn make_query_string(&self) -> String { + format!( + "{}&channel_backend={}&channel_name={}&agg_kind={:?}", + self.patch.to_url_params_strings(), + self.channel.backend, + self.channel.name, + self.agg_kind + ) + } + + pub fn patch(&self) -> &PreBinnedPatchCoord { + &self.patch + } +} + +fn channel_from_params(params: &BTreeMap) -> Result { + let ret = Channel { + backend: params + .get("channel_backend") + .ok_or(Error::with_msg("missing channel_backend"))? + .into(), + name: params + .get("channel_name") + .ok_or(Error::with_msg("missing channel_name"))? + .into(), + }; + Ok(ret) +} + +fn cache_usage_from_params(params: &BTreeMap) -> Result { + let ret = params.get("cache_usage").map_or(Ok::<_, Error>(CacheUsage::Use), |k| { + if k == "use" { + Ok(CacheUsage::Use) + } else if k == "ignore" { + Ok(CacheUsage::Ignore) + } else if k == "recreate" { + Ok(CacheUsage::Recreate) + } else { + Err(Error::with_msg(format!("unexpected cache_usage {:?}", k)))? + } + })?; + Ok(ret) +} + pub async fn binned_bytes_for_http( node_config: &NodeConfigCached, - query: &Query, + query: &BinnedQuery, ) -> Result { let range = &query.range; let channel_config = read_local_config(&query.channel, &node_config.node).await?; let entry = extract_matching_config_entry(range, &channel_config); info!("found config entry {:?}", entry); - let range = BinnedRange::covering_range(range.clone(), query.count) + let range = BinnedRange::covering_range(range.clone(), query.bin_count) .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?; - match PreBinnedPatchRange::covering_range(query.range.clone(), query.count) { + match PreBinnedPatchRange::covering_range(query.range.clone(), query.bin_count) { Some(pre_range) => { info!("Found pre_range: {:?}", pre_range); if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { @@ -90,6 +175,7 @@ pub async fn binned_bytes_for_http( query.channel.clone(), range, query.agg_kind.clone(), + query.cache_usage.clone(), node_config, ); let ret = BinnedBytesForHttpStream::new(s1); @@ -150,31 +236,6 @@ impl Stream for BinnedBytesForHttpStream { } } -#[derive(Clone, Debug)] -pub struct PreBinnedQuery { - pub patch: PreBinnedPatchCoord, - agg_kind: AggKind, - channel: Channel, -} - -impl PreBinnedQuery { - pub fn from_request(req: &http::request::Parts) -> Result { - let params = netpod::query_params(req.uri.query()); - let patch_ix = params.get("patch_ix").unwrap().parse().unwrap(); - let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap(); - let ret = PreBinnedQuery { - patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix), - agg_kind: AggKind::DimXBins1, - channel: Channel { - backend: params.get("channel_backend").unwrap().into(), - name: params.get("channel_name").unwrap().into(), - }, - }; - info!("PreBinnedQuery::from_request {:?}", ret); - Ok(ret) - } -} - // NOTE This answers a request for a single valid pre-binned patch. // A user must first make sure that the grid spec is valid, and that this node is responsible for it. // Otherwise it is an error. @@ -183,12 +244,7 @@ pub fn pre_binned_bytes_for_http( query: &PreBinnedQuery, ) -> Result { info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); - let ret = super::cache::pbv::pre_binned_value_byte_stream_new( - query.patch.clone(), - query.channel.clone(), - query.agg_kind.clone(), - node_config, - ); + let ret = super::cache::pbv::pre_binned_value_byte_stream_new(query, node_config); Ok(ret) } @@ -288,10 +344,9 @@ impl Stream for MergedFromRemotes { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; if self.completed { - panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + panic!("MergedFromRemotes poll_next on completed"); } if self.errored { - warn!("MergedFromRemotes return None after Err"); self.completed = true; return Ready(None); } @@ -355,10 +410,10 @@ impl Stream for MergedFromRemotes { } } -pub struct SomeReturnThing {} +pub struct BytesWrap {} -impl From for Bytes { - fn from(_k: SomeReturnThing) -> Self { +impl From for Bytes { + fn from(_k: BytesWrap) -> Self { error!("TODO convert result to octets"); todo!("TODO convert result to octets") } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index cd90426..553dabb 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,6 +1,6 @@ use crate::agg::binnedt::IntoBinnedT; use crate::cache::pbvfs::{PreBinnedFrame, PreBinnedItem, PreBinnedValueFetchedStream}; -use crate::cache::{node_ix_for_patch, MergedFromRemotes}; +use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use bytes::Bytes; @@ -9,11 +9,8 @@ use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::streamext::SCC; -use netpod::{ - AggKind, BinnedRange, Channel, NanoRange, NodeConfigCached, PreBinnedPatchCoord, PreBinnedPatchIterator, - PreBinnedPatchRange, -}; -use std::future::Future; +use netpod::{BinnedRange, NodeConfigCached, PreBinnedPatchIterator, PreBinnedPatchRange}; +use std::future::{ready, Future}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -24,12 +21,10 @@ pub struct PreBinnedValueByteStreamInner { } pub fn pre_binned_value_byte_stream_new( - patch: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, + query: &PreBinnedQuery, node_config: &NodeConfigCached, ) -> PreBinnedValueByteStream { - let s1 = PreBinnedValueStream::new(patch, channel, agg_kind, node_config); + let s1 = PreBinnedValueStream::new(query.clone(), node_config); let s2 = PreBinnedValueByteStreamInner { inp: s1 }; SCC::new(s2) } @@ -51,9 +46,7 @@ impl Stream for PreBinnedValueByteStreamInner { } pub struct PreBinnedValueStream { - patch_coord: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, + query: PreBinnedQuery, node_config: NodeConfigCached, open_check_local_file: Option> + Send>>>, fut2: Option + Send>>>, @@ -62,18 +55,11 @@ pub struct PreBinnedValueStream { } impl PreBinnedValueStream { - pub fn new( - patch_coord: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - node_config: &NodeConfigCached, - ) -> Self { + pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { // TODO check that we are the correct node. - let _node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.node_config.cluster); + let _node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); Self { - patch_coord, - channel, - agg_kind, + query, node_config: node_config.clone(), open_check_local_file: None, fut2: None, @@ -83,33 +69,53 @@ impl PreBinnedValueStream { } fn try_setup_fetch_prebinned_higher_res(&mut self) { - info!("try to find a next better granularity for {:?}", self.patch_coord); - let g = self.patch_coord.bin_t_len(); - let range = NanoRange { - beg: self.patch_coord.patch_beg(), - end: self.patch_coord.patch_end(), - }; - match PreBinnedPatchRange::covering_range(range, self.patch_coord.bin_count() + 1) { + info!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch); + let g = self.query.patch.bin_t_len(); + let range = self.query.patch.patch_range(); + match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { Some(range) => { let h = range.grid_spec.bin_t_len(); info!( - "FOUND NEXT GRAN g {} h {} ratio {} mod {} {:?}", + "try_setup_fetch_prebinned_higher_res found g {} h {} ratio {} mod {} {:?}", g, h, g / h, g % h, - range + range, ); - assert!(g / h > 1); - assert!(g / h < 200); - assert!(g % h == 0); - let channel = self.channel.clone(); - let agg_kind = self.agg_kind.clone(); + if g / h <= 1 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } + if g / h > 200 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } + if g % h != 0 { + error!("try_setup_fetch_prebinned_higher_res g {} h {}", g, h); + return; + } let node_config = self.node_config.clone(); let patch_it = PreBinnedPatchIterator::from_range(range); let s = futures_util::stream::iter(patch_it) - .map(move |coord| { - PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config) + .map({ + let q2 = self.query.clone(); + move |patch| { + let query = PreBinnedQuery { + patch, + channel: q2.channel.clone(), + agg_kind: q2.agg_kind.clone(), + cache_usage: q2.cache_usage.clone(), + }; + PreBinnedValueFetchedStream::new(&query, &node_config) + } + }) + .filter_map(|k| match k { + Ok(k) => ready(Some(k)), + Err(e) => { + error!("{:?}", e); + ready(None) + } }) .flatten(); self.fut2 = Some(Box::pin(s)); @@ -117,32 +123,26 @@ impl PreBinnedValueStream { None => { warn!("no better resolution found for g {}", g); let evq = EventsQuery { - channel: self.channel.clone(), - range: NanoRange { - beg: self.patch_coord.patch_beg(), - end: self.patch_coord.patch_end(), - }, - agg_kind: self.agg_kind.clone(), + channel: self.query.channel.clone(), + range: self.query.patch.patch_range(), + agg_kind: self.query.agg_kind.clone(), }; - assert!(self.patch_coord.patch_t_len() % self.patch_coord.bin_t_len() == 0); + if self.query.patch.patch_t_len() % self.query.patch.bin_t_len() != 0 { + error!( + "Patch length inconsistency {} {}", + self.query.patch.patch_t_len(), + self.query.patch.bin_t_len() + ); + return; + } error!("try_setup_fetch_prebinned_higher_res apply all requested transformations and T-binning"); - let count = self.patch_coord.patch_t_len() / self.patch_coord.bin_t_len(); + let count = self.query.patch.patch_t_len() / self.query.patch.bin_t_len(); let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap(); let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone()); - let s2 = s1 - .map(|k| { - if k.is_err() { - error!("\n\n\n.................. try_setup_fetch_prebinned_higher_res got ERROR"); - } else { - trace!("try_setup_fetch_prebinned_higher_res got some item from MergedFromRemotes"); - } - k - }) - .into_binned_t(range) - .map(|k| match k { - Ok(k) => PreBinnedFrame(Ok(PreBinnedItem::Batch(k))), - Err(e) => PreBinnedFrame(Err(e)), - }); + let s2 = s1.into_binned_t(range).map(|k| match k { + Ok(k) => PreBinnedFrame(Ok(PreBinnedItem::Batch(k))), + Err(e) => PreBinnedFrame(Err(e)), + }); self.fut2 = Some(Box::pin(s2)); } } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index d6619ab..616d017 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -1,5 +1,5 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; +use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead, PreBinnedQuery}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::decode_frame; use err::Error; @@ -7,7 +7,7 @@ use futures_core::Stream; use futures_util::{pin_mut, FutureExt}; #[allow(unused_imports)] use netpod::log::*; -use netpod::{AggKind, Channel, NodeConfigCached, PreBinnedPatchCoord}; +use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -21,35 +21,26 @@ pub struct PreBinnedValueFetchedStream { } impl PreBinnedValueFetchedStream { - pub fn new( - patch_coord: PreBinnedPatchCoord, - channel: Channel, - agg_kind: AggKind, - node_config: &NodeConfigCached, - ) -> Self { - let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.node_config.cluster); + pub fn new(query: &PreBinnedQuery, node_config: &NodeConfigCached) -> Result { + let nodeix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); let node = &node_config.node_config.cluster.nodes[nodeix as usize]; warn!("TODO defining property of a PreBinnedPatchCoord? patchlen + ix? binsize + patchix? binsize + patchsize + patchix?"); - // TODO encapsulate uri creation, how to express aggregation kind? let uri: hyper::Uri = format!( - "http://{}:{}/api/1/prebinned?{}&channel_backend={}&channel_name={}&agg_kind={:?}", + "http://{}:{}/api/1/prebinned?{}", node.host, node.port, - patch_coord.to_url_params_strings(), - channel.backend, - channel.name, - agg_kind, + query.make_query_string() ) - .parse() - .unwrap(); + .parse()?; info!("PreBinnedValueFetchedStream open uri {}", uri); - Self { + let ret = Self { uri, resfut: None, res: None, errored: false, completed: false, - } + }; + Ok(ret) } } diff --git a/err/src/lib.rs b/err/src/lib.rs index e8726e0..f7d6269 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -2,6 +2,7 @@ Error handling and reporting. */ +use http::uri::InvalidUri; use nom::error::ErrorKind; use serde::{Deserialize, Serialize, Serializer}; use std::fmt::Debug; @@ -198,6 +199,12 @@ impl From> for Error { } } +impl From for Error { + fn from(k: InvalidUri) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todoval() -> T { todo!("TODO todoval") } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index deccffe..44a4f35 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -220,7 +220,7 @@ where async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); - let query = disk::cache::Query::from_request(&head)?; + let query = disk::cache::BinnedQuery::from_request(&head)?; let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s, format!("desc-BINNED")))?, Err(e) => { @@ -234,7 +234,7 @@ async fn binned(req: Request, node_config: &NodeConfigCached) -> Result, node_config: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; - let desc = format!("pre-b-{}", q.patch.bin_t_len() / 1000000000); + let desc = format!("pre-b-{}", q.patch().bin_t_len() / 1000000000); let span1 = span!(Level::INFO, "httpret::prebinned", desc = &desc.as_str()); span1.in_scope(|| { trace!("prebinned"); @@ -243,8 +243,8 @@ async fn prebinned(req: Request, node_config: &NodeConfigCached) -> Result s, format!( "pre-b-{}-p-{}", - q.patch.bin_t_len() / 1000000000, - q.patch.patch_beg() / 1000000000, + q.patch().bin_t_len() / 1000000000, + q.patch().patch_beg() / 1000000000, ), ))?, Err(e) => { diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 1d92ece..2e28938 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -397,6 +397,13 @@ impl PreBinnedPatchCoord { self.spec.patch_t_len() * (self.ix + 1) } + pub fn patch_range(&self) -> NanoRange { + NanoRange { + beg: self.patch_beg(), + end: self.patch_end(), + } + } + pub fn bin_count(&self) -> u64 { self.patch_t_len() / self.spec.bin_t_len }