diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 9f59250..6b1fac3 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,17 +1,19 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos}; +use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec}; use futures_core::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; use chrono::{DateTime, Utc}; - +use crate::agg::MinMaxAvgScalarBinBatch; +use futures_util::StreamExt; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Query { range: NanoRange, + count: u64, agg_kind: AggKind, } @@ -26,6 +28,7 @@ impl Query { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, + count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(), agg_kind: AggKind::DimXBins1, }; info!("Query::from_request {:?}", ret); @@ -40,25 +43,103 @@ pub struct BinParams { pub cluster: Cluster, } -pub fn binned_bytes_for_http(params: BinParams) -> Result { +pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result { // TODO // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. + let grid = PreBinnedPatchGridSpec::over_range(query.range.clone(), query.count); + match grid { + Some(spec) => { + info!("GOT PreBinnedPatchGridSpec: {:?}", spec); + let mut it = netpod::PreBinnedPatchIterator::from_range(spec.clone()); + for coord in it { + // Iterate over the patches. + // Request the patch from each node. + // Merge. + // Agg+Bin. + // Deliver. + info!("coord: {:?}", coord) + } + } + None => { + // Merge raw data + todo!() + } + } - let ret = BinnedBytesForHttpStream {}; + let ret = BinnedBytesForHttpStream { + inp: todo!(), + }; Ok(ret) } + pub struct BinnedBytesForHttpStream { + inp: BinnedStream, +} + +impl BinnedBytesForHttpStream { } impl Stream for BinnedBytesForHttpStream { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // TODO + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + error!("TODO translate the structured stream into plain bytes for http"); + self.inp.poll_next_unpin(cx); Ready(None) } } + + +pub struct PreBinnedViaHttpStreamStream { +} + +impl Stream for PreBinnedViaHttpStreamStream { + // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) + type Item = Result + Send>>, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + // TODO when requested next, create the next http request, connect, check headers + // and as soon as ready, wrap the body in the appropriate parser and return the stream. + // The wire protocol is not yet defined. + todo!() + } + +} + + +// NOTE provides structures of merged, aggregated and binned to the final user grid +pub struct BinnedStream { + + // TODO what kind of stream do I need here, and who builds it? + + inp: Pin> + Send>>, +} + +impl Stream for BinnedStream { + // TODO make this generic over all possible things + type Item = MinMaxAvgScalarBinBatch; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + self.inp.poll_next_unpin(cx); + todo!() + } + +} + + + +pub struct SomeReturnThing {} + +impl From for Bytes { + + fn from(k: SomeReturnThing) -> Self { + todo!("TODO convert result to octets") + } + +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index a00117b..f976d54 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -141,12 +141,12 @@ async fn binned(req: Request, hconf: HostConf) -> Result, E // Look up and parse channel config. // Extract the relevant channel config entry. - disk::cache::Query::from_request(&head)?; + let query = disk::cache::Query::from_request(&head)?; let params = BinParams { node: hconf.node.clone(), cluster: hconf.cluster.clone(), }; - let ret = match disk::cache::binned_bytes_for_http(params) { + let ret = match disk::cache::binned_bytes_for_http(params, &query) { Ok(s) => { response(StatusCode::OK) .body(Body::wrap_stream(s))? diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 3418394..e542c27 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -10,4 +10,5 @@ async-channel = "1.6" bytes = "1.0.1" chrono = { version = "0.4.19", features = ["serde"] } futures-core = "0.3.12" +tracing = "0.1.25" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 7902646..280137e 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,3 +1,5 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; use serde::{Serialize, Deserialize}; use err::Error; use std::path::PathBuf; @@ -147,6 +149,14 @@ pub struct NanoRange { pub end: u64, } +impl NanoRange { + + pub fn delta(&self) -> u64 { + self.end - self.beg + } + +} + #[test] fn serde_channel() { @@ -211,9 +221,13 @@ impl BinSpecDimT { ]; let mut i1 = 0; let bs = loop { - if i1 >= thresholds.len() { break *thresholds.last().unwrap(); } + if i1 >= thresholds.len() { + break *thresholds.last().unwrap(); + } let t = thresholds[i1]; - if bs < t { break t; } + if bs <= t { + break t; + } i1 += 1; }; //info!("INPUT TS {} {}", ts1, ts2); @@ -222,7 +236,7 @@ impl BinSpecDimT { let ts2 = (ts2 + bs - 1) / bs * bs; //info!("ADJUSTED TS {} {}", ts1, ts2); BinSpecDimT { - count, + count: (ts2 - ts1) / bs, ts1, ts2, bs, @@ -239,23 +253,105 @@ impl BinSpecDimT { } +#[derive(Clone, Debug)] +pub struct PreBinnedPatchGridSpec { + range: NanoRange, + bs: u64, + count: u64, +} + +impl PreBinnedPatchGridSpec { + + pub fn over_range(range: NanoRange, count: u64) -> Option { + use timeunits::*; + assert!(count >= 1); + assert!(count <= 2000); + let dt = range.delta(); + assert!(dt <= DAY * 14); + let bs = dt / count; + info!("BASIC bs {}", bs); + let thresholds = [ + SEC * 10, + MIN * 10, + HOUR, + HOUR * 4, + DAY, + DAY * 4, + ]; + let mut i1 = thresholds.len(); + loop { + if i1 <= 0 { + break None; + } + else { + i1 -= 1; + let t = thresholds[i1]; + info!("look at threshold {} bs {}", t, bs); + if t <= bs { + let bs = t; + let ts1 = range.beg / bs * bs; + let ts2 = (range.end + bs - 1) / bs * bs; + if (ts2 - ts1) % bs != 0 { + panic!(); + } + let range = NanoRange { + beg: ts1, + end: ts2, + }; + let count = range.delta() / bs; + break Some(Self { + range, + bs, + count, + }); + } + } + } + } + +} + + +#[derive(Clone, Debug)] +pub struct PreBinnedPatchCoord { + range: NanoRange, +} + pub struct PreBinnedPatchIterator { + spec: PreBinnedPatchGridSpec, agg_kind: AggKind, + ix: u64, } impl PreBinnedPatchIterator { - pub fn iter_blocks_for_request(range: NanoRange) -> Self { - todo!() + pub fn from_range(spec: PreBinnedPatchGridSpec) -> Self { + Self { + spec, + agg_kind: AggKind::DimXBins1, + ix: 0, + } } } impl Iterator for PreBinnedPatchIterator { - type Item = (); + type Item = PreBinnedPatchCoord; fn next(&mut self) -> Option { - todo!() + if self.ix >= self.spec.count { + None + } + else { + let ret = Self::Item { + range: NanoRange { + beg: self.spec.range.beg + self.ix * self.spec.bs, + end: self.spec.range.beg + (self.ix + 1) * self.spec.bs, + }, + }; + self.ix += 1; + Some(ret) + } } } diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 45ab3b9..b851699 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -20,7 +20,7 @@ async fn get_cached_0_inner() -> Result<(), Error> { let hosts = spawn_test_hosts(&cluster); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(format!("http://{}:{}/api/1/binned?beg_date=1970-01-01T00:00:01.4253Z&end_date=1970-01-01T00:00:04.000Z", node0.host, node0.port)) + .uri(format!("http://{}:{}/api/1/binned?bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port)) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?;