diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 2d0cf35..ad70c4f 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -6,7 +6,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures_core::Stream; use futures_util::{StreamExt, FutureExt, pin_mut}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut, BufMut}; use chrono::{DateTime, Utc}; use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel}; use crate::agg::MinMaxAvgScalarBinBatch; @@ -53,6 +53,7 @@ pub struct BinParams { } pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result { + let agg_kind = AggKind::DimXBins1; // TODO // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. @@ -61,7 +62,7 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); warn!("Pass here to BinnedStream what kind of Agg, range, ..."); - let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), params.cluster.clone()); + let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, params.cluster.clone()); // Iterate over the patches. // Request the patch from each node. // Merge. @@ -92,10 +93,11 @@ impl Stream for BinnedBytesForHttpStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - error!("TODO translate the structured stream into plain bytes for http"); match self.inp.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { - Ready(Some(Ok(Bytes::new()))) + let mut buf = BytesMut::with_capacity(250); + buf.put(&b"TODO serialize to bytes\n"[..]); + Ready(Some(Ok(buf.freeze()))) } Ready(Some(Err(e))) => Ready(Some(Err(e))), Ready(None) => Ready(None), @@ -115,17 +117,17 @@ pub struct PreBinnedValueStream { impl PreBinnedValueStream { - pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, cluster: Cluster) -> Self { + pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self { let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster); - warn!("TODO PASS THE KIND OF AGG"); let node = &cluster.nodes[nodeix]; let uri: hyper::Uri = format!( - "http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}", + "http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}&agg_kind={:?}", node.host, node.port, patch_coord.range.beg, patch_coord.range.end, channel.name, + agg_kind, ).parse().unwrap(); Self { uri, @@ -212,11 +214,11 @@ pub struct BinnedStream { impl BinnedStream { - pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, cluster: Cluster) -> Self { + pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Cluster) -> Self { let mut patch_it = patch_it; let inp = futures_util::stream::iter(patch_it) .map(move |coord| { - PreBinnedValueStream::new(coord, channel.clone(), cluster.clone()) + PreBinnedValueStream::new(coord, channel.clone(), agg_kind.clone(), cluster.clone()) }) .flatten(); Self { diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index bd5b70a..634978d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -160,6 +160,28 @@ async fn binned(req: Request, hconf: HostConf) -> Result, E } +async fn prebinned(req: Request, hconf: HostConf) -> Result, Error> { + let (head, body) = req.into_parts(); + todo!("create a new PreBinnedQuery and let extract from query"); + let params = BinParams { + node: hconf.node.clone(), + cluster: hconf.cluster.clone(), + }; + todo!("create this new entry point in disk::cache"); + let ret = match Ok(()) { + Ok(s) => { + response(StatusCode::OK) + .body(Body::wrap_stream(______))? + } + Err(e) => { + error!("{:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } + }; + Ok(ret) +} + + #[derive(Clone)] pub struct HostConf {