WIP on first client-facing stages of cache pipeline
This commit is contained in:
@@ -6,6 +6,7 @@ edition = "2018"
|
||||
|
||||
[dependencies]
|
||||
http = "0.2"
|
||||
hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
chrono = { version = "0.4.19", features = ["serde"] }
|
||||
@@ -18,6 +19,7 @@ futures-util = "0.3.14"
|
||||
async-stream = "0.3.0"
|
||||
tracing = "0.1.25"
|
||||
hex = "0.4.3"
|
||||
tiny-keccak = { version = "2.0", features = ["sha3"] }
|
||||
err = { path = "../err" }
|
||||
taskrun = { path = "../taskrun" }
|
||||
netpod = { path = "../netpod" }
|
||||
|
||||
@@ -1,20 +1,24 @@
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use err::Error;
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec};
|
||||
use futures_core::Stream;
|
||||
use std::future::ready;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use futures_core::Stream;
|
||||
use futures_util::{StreamExt, FutureExt, pin_mut};
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec, PreBinnedPatchIterator, PreBinnedPatchCoord, Channel};
|
||||
use crate::agg::MinMaxAvgScalarBinBatch;
|
||||
use futures_util::StreamExt;
|
||||
use http::uri::Scheme;
|
||||
use tiny_keccak::Hasher;
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Query {
|
||||
range: NanoRange,
|
||||
count: u64,
|
||||
agg_kind: AggKind,
|
||||
channel: Channel,
|
||||
}
|
||||
|
||||
impl Query {
|
||||
@@ -30,6 +34,11 @@ impl Query {
|
||||
},
|
||||
count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(),
|
||||
agg_kind: AggKind::DimXBins1,
|
||||
channel: Channel {
|
||||
backend: params.get("channel_backend").unwrap().into(),
|
||||
keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(),
|
||||
name: params.get("channel_name").unwrap().into(),
|
||||
},
|
||||
};
|
||||
info!("Query::from_request {:?}", ret);
|
||||
Ok(ret)
|
||||
@@ -51,26 +60,23 @@ pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedB
|
||||
match grid {
|
||||
Some(spec) => {
|
||||
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
|
||||
let mut it = netpod::PreBinnedPatchIterator::from_range(spec.clone());
|
||||
for coord in it {
|
||||
// Iterate over the patches.
|
||||
// Request the patch from each node.
|
||||
// Merge.
|
||||
// Agg+Bin.
|
||||
// Deliver.
|
||||
info!("coord: {:?}", coord)
|
||||
}
|
||||
warn!("Pass here to BinnedStream what kind of Agg, range, ...");
|
||||
let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), params.cluster.clone());
|
||||
// Iterate over the patches.
|
||||
// Request the patch from each node.
|
||||
// Merge.
|
||||
// Agg+Bin.
|
||||
// Deliver.
|
||||
let ret = BinnedBytesForHttpStream {
|
||||
inp: s1,
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
None => {
|
||||
// Merge raw data
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
let ret = BinnedBytesForHttpStream {
|
||||
inp: todo!(),
|
||||
};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
@@ -87,47 +93,153 @@ impl Stream for BinnedBytesForHttpStream {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
error!("TODO translate the structured stream into plain bytes for http");
|
||||
self.inp.poll_next_unpin(cx);
|
||||
Ready(None)
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
Ready(Some(Ok(Bytes::new())))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub struct PreBinnedViaHttpStreamStream {
|
||||
pub struct PreBinnedValueStream {
|
||||
uri: http::Uri,
|
||||
patch_coord: PreBinnedPatchCoord,
|
||||
resfut: Option<hyper::client::ResponseFuture>,
|
||||
res: Option<hyper::Response<hyper::Body>>,
|
||||
}
|
||||
|
||||
impl Stream for PreBinnedViaHttpStreamStream {
|
||||
impl PreBinnedValueStream {
|
||||
|
||||
pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, cluster: Cluster) -> Self {
|
||||
let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster);
|
||||
warn!("TODO PASS THE KIND OF AGG");
|
||||
let node = &cluster.nodes[nodeix];
|
||||
let uri: hyper::Uri = format!(
|
||||
"http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}",
|
||||
node.host,
|
||||
node.port,
|
||||
patch_coord.range.beg,
|
||||
patch_coord.range.end,
|
||||
channel.name,
|
||||
).parse().unwrap();
|
||||
Self {
|
||||
uri,
|
||||
patch_coord,
|
||||
resfut: None,
|
||||
res: None,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> usize {
|
||||
let mut hash = tiny_keccak::Sha3::v256();
|
||||
hash.update(channel.name.as_bytes());
|
||||
0
|
||||
}
|
||||
|
||||
|
||||
impl Stream for PreBinnedValueStream {
|
||||
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
|
||||
type Item = Result<Pin<Box<dyn Stream<Item=MinMaxAvgScalarBinBatch> + Send>>, Error>;
|
||||
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
// TODO when requested next, create the next http request, connect, check headers
|
||||
// and as soon as ready, wrap the body in the appropriate parser and return the stream.
|
||||
// The wire protocol is not yet defined.
|
||||
todo!()
|
||||
'outer: loop {
|
||||
break match self.res.as_mut() {
|
||||
Some(res) => {
|
||||
pin_mut!(res);
|
||||
use hyper::body::HttpBody;
|
||||
match res.poll_data(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
Pending
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e.into()))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
None => {
|
||||
match self.resfut.as_mut() {
|
||||
Some(mut resfut) => {
|
||||
match resfut.poll_unpin(cx) {
|
||||
Ready(res) => {
|
||||
match res {
|
||||
Ok(res) => {
|
||||
info!("Got result from subrequest: {:?}", res);
|
||||
self.res = Some(res);
|
||||
continue 'outer;
|
||||
}
|
||||
Err(e) => Ready(Some(Err(e.into()))),
|
||||
}
|
||||
}
|
||||
Pending => {
|
||||
Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
let req = hyper::Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri(&self.uri)
|
||||
.body(hyper::Body::empty())?;
|
||||
let client = hyper::Client::new();
|
||||
self.resfut = Some(client.request(req));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
// NOTE provides structures of merged, aggregated and binned to the final user grid
|
||||
pub struct BinnedStream {
|
||||
|
||||
// TODO what kind of stream do I need here, and who builds it?
|
||||
|
||||
inp: Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>,
|
||||
}
|
||||
|
||||
impl BinnedStream {
|
||||
|
||||
pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, cluster: Cluster) -> Self {
|
||||
let mut patch_it = patch_it;
|
||||
let inp = futures_util::stream::iter(patch_it)
|
||||
.map(move |coord| {
|
||||
PreBinnedValueStream::new(coord, channel.clone(), cluster.clone())
|
||||
})
|
||||
.flatten();
|
||||
Self {
|
||||
inp: Box::pin(inp),
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Stream for BinnedStream {
|
||||
// TODO make this generic over all possible things
|
||||
type Item = MinMaxAvgScalarBinBatch;
|
||||
type Item = Result<MinMaxAvgScalarBinBatch, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
self.inp.poll_next_unpin(cx);
|
||||
todo!()
|
||||
match self.inp.poll_next_unpin(cx) {
|
||||
Ready(Some(Ok(k))) => {
|
||||
Ready(Some(Ok(k)))
|
||||
}
|
||||
Ready(Some(Err(e))) => Ready(Some(Err(e))),
|
||||
Ready(None) => Ready(None),
|
||||
Pending => Pending,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ impl hyper::body::HttpBody for BodyStreamWrap {
|
||||
|
||||
async fn binned(req: Request<Body>, hconf: HostConf) -> Result<Response<Body>, Error> {
|
||||
let (head, body) = req.into_parts();
|
||||
let params = netpod::query_params(head.uri.query());
|
||||
//let params = netpod::query_params(head.uri.query());
|
||||
|
||||
// TODO
|
||||
// Channel, time range, bin size.
|
||||
|
||||
@@ -286,7 +286,7 @@ impl PreBinnedPatchGridSpec {
|
||||
else {
|
||||
i1 -= 1;
|
||||
let t = thresholds[i1];
|
||||
info!("look at threshold {} bs {}", t, bs);
|
||||
//info!("look at threshold {} bs {}", t, bs);
|
||||
if t <= bs {
|
||||
let bs = t;
|
||||
let ts1 = range.beg / bs * bs;
|
||||
@@ -314,7 +314,7 @@ impl PreBinnedPatchGridSpec {
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PreBinnedPatchCoord {
|
||||
range: NanoRange,
|
||||
pub range: NanoRange,
|
||||
}
|
||||
|
||||
pub struct PreBinnedPatchIterator {
|
||||
|
||||
@@ -20,7 +20,7 @@ async fn get_cached_0_inner() -> Result<(), Error> {
|
||||
let hosts = spawn_test_hosts(&cluster);
|
||||
let req = hyper::Request::builder()
|
||||
.method(http::Method::GET)
|
||||
.uri(format!("http://{}:{}/api/1/binned?bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port))
|
||||
.uri(format!("http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port))
|
||||
.body(Body::empty())?;
|
||||
let client = hyper::Client::new();
|
||||
let res = client.request(req).await?;
|
||||
|
||||
Reference in New Issue
Block a user