diff --git a/disk/Cargo.toml b/disk/Cargo.toml index d881c80..9fcac4f 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] http = "0.2" +hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" chrono = { version = "0.4.19", features = ["serde"] } @@ -18,6 +19,7 @@ futures-util = "0.3.14" async-stream = "0.3.0" tracing = "0.1.25" hex = "0.4.3" +tiny-keccak = { version = "2.0", features = ["sha3"] } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 6b1fac3..2d0cf35 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,20 +1,24 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec}; -use futures_core::Stream; +use std::future::ready; use std::pin::Pin; use std::task::{Context, Poll}; +use futures_core::Stream; +use futures_util::{StreamExt, FutureExt, pin_mut}; use bytes::Bytes; use chrono::{DateTime, Utc}; +use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel}; use crate::agg::MinMaxAvgScalarBinBatch; -use futures_util::StreamExt; +use http::uri::Scheme; +use tiny_keccak::Hasher; #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Query { range: NanoRange, count: u64, agg_kind: AggKind, + channel: Channel, } impl Query { @@ -30,6 +34,11 @@ impl Query { }, count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(), agg_kind: AggKind::DimXBins1, + channel: Channel { + backend: params.get("channel_backend").unwrap().into(), + keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(), + name: params.get("channel_name").unwrap().into(), + }, }; info!("Query::from_request {:?}", ret); Ok(ret) @@ -51,26 +60,23 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); - let mut it = netpod::PreBinnedPatchIterator::from_range(spec.clone()); - for coord in it { - // Iterate over the patches. - // Request the patch from each node. - // Merge. - // Agg+Bin. - // Deliver. - info!("coord: {:?}", coord) - } + warn!("Pass here to BinnedStream what kind of Agg, range, ..."); + let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), params.cluster.clone()); + // Iterate over the patches. + // Request the patch from each node. + // Merge. + // Agg+Bin. + // Deliver. + let ret = BinnedBytesForHttpStream { + inp: s1, + }; + Ok(ret) } None => { // Merge raw data todo!() } } - - let ret = BinnedBytesForHttpStream { - inp: todo!(), - }; - Ok(ret) } @@ -87,47 +93,153 @@ impl Stream for BinnedBytesForHttpStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; error!("TODO translate the structured stream into plain bytes for http"); - self.inp.poll_next_unpin(cx); - Ready(None) + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + Ready(Some(Ok(Bytes::new()))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } } } -pub struct PreBinnedViaHttpStreamStream { +pub struct PreBinnedValueStream { + uri: http::Uri, + patch_coord: PreBinnedPatchCoord, + resfut: Option, + res: Option>, } -impl Stream for PreBinnedViaHttpStreamStream { +impl PreBinnedValueStream { + + pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, cluster: Cluster) -> Self { + let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster); + warn!("TODO PASS THE KIND OF AGG"); + let node = &cluster.nodes[nodeix]; + let uri: hyper::Uri = format!( + "http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}", + node.host, + node.port, + patch_coord.range.beg, + patch_coord.range.end, + channel.name, + ).parse().unwrap(); + Self { + uri, + patch_coord, + resfut: None, + res: None, + } + } + +} + + +pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> usize { + let mut hash = tiny_keccak::Sha3::v256(); + hash.update(channel.name.as_bytes()); + 0 +} + + +impl Stream for PreBinnedValueStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) - type Item = Result + Send>>, Error>; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; // TODO when requested next, create the next http request, connect, check headers // and as soon as ready, wrap the body in the appropriate parser and return the stream. // The wire protocol is not yet defined. - todo!() + 'outer: loop { + break match self.res.as_mut() { + Some(res) => { + pin_mut!(res); + use hyper::body::HttpBody; + match res.poll_data(cx) { + Ready(Some(Ok(k))) => { + Pending + } + Ready(Some(Err(e))) => Ready(Some(Err(e.into()))), + Ready(None) => Ready(None), + Pending => Pending, + } + } + None => { + match self.resfut.as_mut() { + Some(mut resfut) => { + match resfut.poll_unpin(cx) { + Ready(res) => { + match res { + Ok(res) => { + info!("Got result from subrequest: {:?}", res); + self.res = Some(res); + continue 'outer; + } + Err(e) => Ready(Some(Err(e.into()))), + } + } + Pending => { + Pending + } + } + } + None => { + + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(&self.uri) + .body(hyper::Body::empty())?; + let client = hyper::Client::new(); + self.resfut = Some(client.request(req)); + continue 'outer; + } + } + } + } + } } } -// NOTE provides structures of merged, aggregated and binned to the final user grid pub struct BinnedStream { - - // TODO what kind of stream do I need here, and who builds it? - inp: Pin> + Send>>, } +impl BinnedStream { + + pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, cluster: Cluster) -> Self { + let mut patch_it = patch_it; + let inp = futures_util::stream::iter(patch_it) + .map(move |coord| { + PreBinnedValueStream::new(coord, channel.clone(), cluster.clone()) + }) + .flatten(); + Self { + inp: Box::pin(inp), + } + } + +} + impl Stream for BinnedStream { // TODO make this generic over all possible things - type Item = MinMaxAvgScalarBinBatch; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - self.inp.poll_next_unpin(cx); - todo!() + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + Ready(Some(Ok(k))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } } } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index f976d54..bd5b70a 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -133,7 +133,7 @@ impl hyper::body::HttpBody for BodyStreamWrap { async fn binned(req: Request, hconf: HostConf) -> Result, Error> { let (head, body) = req.into_parts(); - let params = netpod::query_params(head.uri.query()); + //let params = netpod::query_params(head.uri.query()); // TODO // Channel, time range, bin size. diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 280137e..1a4df93 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -286,7 +286,7 @@ impl PreBinnedPatchGridSpec { else { i1 -= 1; let t = thresholds[i1]; - info!("look at threshold {} bs {}", t, bs); + //info!("look at threshold {} bs {}", t, bs); if t <= bs { let bs = t; let ts1 = range.beg / bs * bs; @@ -314,7 +314,7 @@ impl PreBinnedPatchGridSpec { #[derive(Clone, Debug)] pub struct PreBinnedPatchCoord { - range: NanoRange, + pub range: NanoRange, } pub struct PreBinnedPatchIterator { diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index b851699..2b57c7f 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -20,7 +20,7 @@ async fn get_cached_0_inner() -> Result<(), Error> { let hosts = spawn_test_hosts(&cluster); let req = hyper::Request::builder() .method(http::Method::GET) - .uri(format!("http://{}:{}/api/1/binned?bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port)) + .uri(format!("http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port)) .body(Body::empty())?; let client = hyper::Client::new(); let res = client.request(req).await?;