This commit is contained in:
Dominik Werder
2021-04-12 20:55:44 +02:00
parent 7f52158971
commit 9941960e93
5 changed files with 194 additions and 16 deletions
+87 -6
View File
@@ -1,17 +1,19 @@
#[allow(unused_imports)] #[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace}; use tracing::{error, warn, info, debug, trace};
use err::Error; use err::Error;
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos}; use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos, PreBinnedPatchGridSpec};
use futures_core::Stream; use futures_core::Stream;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use crate::agg::MinMaxAvgScalarBinBatch;
use futures_util::StreamExt;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct Query { pub struct Query {
range: NanoRange, range: NanoRange,
count: u64,
agg_kind: AggKind, agg_kind: AggKind,
} }
@@ -26,6 +28,7 @@ impl Query {
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(), beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(), end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
}, },
count: params.get("bin_count").ok_or(Error::with_msg("missing beg_date"))?.parse().unwrap(),
agg_kind: AggKind::DimXBins1, agg_kind: AggKind::DimXBins1,
}; };
info!("Query::from_request {:?}", ret); info!("Query::from_request {:?}", ret);
@@ -40,25 +43,103 @@ pub struct BinParams {
pub cluster: Cluster, pub cluster: Cluster,
} }
pub fn binned_bytes_for_http(params: BinParams) -> Result<BinnedBytesForHttpStream, Error> { pub fn binned_bytes_for_http(params: BinParams, query: &Query) -> Result<BinnedBytesForHttpStream, Error> {
// TODO // TODO
// Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches.
let grid = PreBinnedPatchGridSpec::over_range(query.range.clone(), query.count);
match grid {
Some(spec) => {
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
let mut it = netpod::PreBinnedPatchIterator::from_range(spec.clone());
for coord in it {
// Iterate over the patches.
// Request the patch from each node.
// Merge.
// Agg+Bin.
// Deliver.
info!("coord: {:?}", coord)
}
}
None => {
// Merge raw data
todo!()
}
}
let ret = BinnedBytesForHttpStream {}; let ret = BinnedBytesForHttpStream {
inp: todo!(),
};
Ok(ret) Ok(ret)
} }
pub struct BinnedBytesForHttpStream { pub struct BinnedBytesForHttpStream {
inp: BinnedStream,
}
impl BinnedBytesForHttpStream {
} }
impl Stream for BinnedBytesForHttpStream { impl Stream for BinnedBytesForHttpStream {
type Item = Result<Bytes, Error>; type Item = Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// TODO
use Poll::*; use Poll::*;
error!("TODO translate the structured stream into plain bytes for http");
self.inp.poll_next_unpin(cx);
Ready(None) Ready(None)
} }
} }
pub struct PreBinnedViaHttpStreamStream {
}
impl Stream for PreBinnedViaHttpStreamStream {
// TODO need this generic for scalar and array (when wave is not binned down to a single scalar point)
type Item = Result<Pin<Box<dyn Stream<Item=MinMaxAvgScalarBinBatch> + Send>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
// TODO when requested next, create the next http request, connect, check headers
// and as soon as ready, wrap the body in the appropriate parser and return the stream.
// The wire protocol is not yet defined.
todo!()
}
}
// NOTE provides structures of merged, aggregated and binned to the final user grid
pub struct BinnedStream {
// TODO what kind of stream do I need here, and who builds it?
inp: Pin<Box<dyn Stream<Item=Result<MinMaxAvgScalarBinBatch, Error>> + Send>>,
}
impl Stream for BinnedStream {
// TODO make this generic over all possible things
type Item = MinMaxAvgScalarBinBatch;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
self.inp.poll_next_unpin(cx);
todo!()
}
}
pub struct SomeReturnThing {}
impl From<SomeReturnThing> for Bytes {
fn from(k: SomeReturnThing) -> Self {
todo!("TODO convert result to octets")
}
}
+2 -2
View File
@@ -141,12 +141,12 @@ async fn binned(req: Request<Body>, hconf: HostConf) -> Result<Response<Body>, E
// Look up and parse channel config. // Look up and parse channel config.
// Extract the relevant channel config entry. // Extract the relevant channel config entry.
disk::cache::Query::from_request(&head)?; let query = disk::cache::Query::from_request(&head)?;
let params = BinParams { let params = BinParams {
node: hconf.node.clone(), node: hconf.node.clone(),
cluster: hconf.cluster.clone(), cluster: hconf.cluster.clone(),
}; };
let ret = match disk::cache::binned_bytes_for_http(params) { let ret = match disk::cache::binned_bytes_for_http(params, &query) {
Ok(s) => { Ok(s) => {
response(StatusCode::OK) response(StatusCode::OK)
.body(Body::wrap_stream(s))? .body(Body::wrap_stream(s))?
+1
View File
@@ -10,4 +10,5 @@ async-channel = "1.6"
bytes = "1.0.1" bytes = "1.0.1"
chrono = { version = "0.4.19", features = ["serde"] } chrono = { version = "0.4.19", features = ["serde"] }
futures-core = "0.3.12" futures-core = "0.3.12"
tracing = "0.1.25"
err = { path = "../err" } err = { path = "../err" }
+103 -7
View File
@@ -1,3 +1,5 @@
#[allow(unused_imports)]
use tracing::{error, warn, info, debug, trace};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use err::Error; use err::Error;
use std::path::PathBuf; use std::path::PathBuf;
@@ -147,6 +149,14 @@ pub struct NanoRange {
pub end: u64, pub end: u64,
} }
impl NanoRange {
pub fn delta(&self) -> u64 {
self.end - self.beg
}
}
#[test] #[test]
fn serde_channel() { fn serde_channel() {
@@ -211,9 +221,13 @@ impl BinSpecDimT {
]; ];
let mut i1 = 0; let mut i1 = 0;
let bs = loop { let bs = loop {
if i1 >= thresholds.len() { break *thresholds.last().unwrap(); } if i1 >= thresholds.len() {
break *thresholds.last().unwrap();
}
let t = thresholds[i1]; let t = thresholds[i1];
if bs < t { break t; } if bs <= t {
break t;
}
i1 += 1; i1 += 1;
}; };
//info!("INPUT TS {} {}", ts1, ts2); //info!("INPUT TS {} {}", ts1, ts2);
@@ -222,7 +236,7 @@ impl BinSpecDimT {
let ts2 = (ts2 + bs - 1) / bs * bs; let ts2 = (ts2 + bs - 1) / bs * bs;
//info!("ADJUSTED TS {} {}", ts1, ts2); //info!("ADJUSTED TS {} {}", ts1, ts2);
BinSpecDimT { BinSpecDimT {
count, count: (ts2 - ts1) / bs,
ts1, ts1,
ts2, ts2,
bs, bs,
@@ -239,23 +253,105 @@ impl BinSpecDimT {
} }
#[derive(Clone, Debug)]
pub struct PreBinnedPatchGridSpec {
range: NanoRange,
bs: u64,
count: u64,
}
impl PreBinnedPatchGridSpec {
pub fn over_range(range: NanoRange, count: u64) -> Option<Self> {
use timeunits::*;
assert!(count >= 1);
assert!(count <= 2000);
let dt = range.delta();
assert!(dt <= DAY * 14);
let bs = dt / count;
info!("BASIC bs {}", bs);
let thresholds = [
SEC * 10,
MIN * 10,
HOUR,
HOUR * 4,
DAY,
DAY * 4,
];
let mut i1 = thresholds.len();
loop {
if i1 <= 0 {
break None;
}
else {
i1 -= 1;
let t = thresholds[i1];
info!("look at threshold {} bs {}", t, bs);
if t <= bs {
let bs = t;
let ts1 = range.beg / bs * bs;
let ts2 = (range.end + bs - 1) / bs * bs;
if (ts2 - ts1) % bs != 0 {
panic!();
}
let range = NanoRange {
beg: ts1,
end: ts2,
};
let count = range.delta() / bs;
break Some(Self {
range,
bs,
count,
});
}
}
}
}
}
#[derive(Clone, Debug)]
pub struct PreBinnedPatchCoord {
range: NanoRange,
}
pub struct PreBinnedPatchIterator { pub struct PreBinnedPatchIterator {
spec: PreBinnedPatchGridSpec,
agg_kind: AggKind, agg_kind: AggKind,
ix: u64,
} }
impl PreBinnedPatchIterator { impl PreBinnedPatchIterator {
pub fn iter_blocks_for_request(range: NanoRange) -> Self { pub fn from_range(spec: PreBinnedPatchGridSpec) -> Self {
todo!() Self {
spec,
agg_kind: AggKind::DimXBins1,
ix: 0,
}
} }
} }
impl Iterator for PreBinnedPatchIterator { impl Iterator for PreBinnedPatchIterator {
type Item = (); type Item = PreBinnedPatchCoord;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
todo!() if self.ix >= self.spec.count {
None
}
else {
let ret = Self::Item {
range: NanoRange {
beg: self.spec.range.beg + self.ix * self.spec.bs,
end: self.spec.range.beg + (self.ix + 1) * self.spec.bs,
},
};
self.ix += 1;
Some(ret)
}
} }
} }
+1 -1
View File
@@ -20,7 +20,7 @@ async fn get_cached_0_inner() -> Result<(), Error> {
let hosts = spawn_test_hosts(&cluster); let hosts = spawn_test_hosts(&cluster);
let req = hyper::Request::builder() let req = hyper::Request::builder()
.method(http::Method::GET) .method(http::Method::GET)
.uri(format!("http://{}:{}/api/1/binned?beg_date=1970-01-01T00:00:01.4253Z&end_date=1970-01-01T00:00:04.000Z", node0.host, node0.port)) .uri(format!("http://{}:{}/api/1/binned?bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", node0.host, node0.port))
.body(Body::empty())?; .body(Body::empty())?;
let client = hyper::Client::new(); let client = hyper::Client::new();
let res = client.request(req).await?; let res = client.request(req).await?;