Start to build a test query for caching layer
This commit is contained in:
@@ -8,6 +8,7 @@ use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt, future::ready};
|
||||
use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*};
|
||||
use crate::merge::MergeDim1F32Stream;
|
||||
use netpod::BinSpecDimT;
|
||||
|
||||
pub trait AggregatorTdim {
|
||||
type InputValue;
|
||||
@@ -658,7 +659,7 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
|
||||
if self.aggtor.is_none() {
|
||||
let range = self.spec.get_range(self.curbin);
|
||||
//info!("range: {} {}", range.ts1, range.ts2);
|
||||
self.aggtor = Some(k.aggregator_new(range.ts1, range.ts2));
|
||||
self.aggtor = Some(k.aggregator_new(range.beg, range.end));
|
||||
}
|
||||
let ag = self.aggtor.as_mut().unwrap();
|
||||
if ag.ends_before(&k) {
|
||||
@@ -706,69 +707,6 @@ where I: AggregatableTdim + Unpin, T: Stream<Item=Result<I, Error>> + Unpin, I::
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct BinSpecDimT {
|
||||
count: u64,
|
||||
ts1: u64,
|
||||
ts2: u64,
|
||||
bs: u64,
|
||||
}
|
||||
|
||||
impl BinSpecDimT {
|
||||
|
||||
pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self {
|
||||
assert!(count >= 1);
|
||||
assert!(count <= 2000);
|
||||
assert!(ts2 > ts1);
|
||||
let dt = ts2 - ts1;
|
||||
assert!(dt <= DAY * 14);
|
||||
let bs = dt / count;
|
||||
let thresholds = [
|
||||
2, 10, 100,
|
||||
1000, 10_000, 100_000,
|
||||
MU, MU * 10, MU * 100,
|
||||
MS, MS * 10, MS * 100,
|
||||
SEC, SEC * 5, SEC * 10, SEC * 20,
|
||||
MIN, MIN * 5, MIN * 10, MIN * 20,
|
||||
HOUR, HOUR * 2, HOUR * 4, HOUR * 12,
|
||||
DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16,
|
||||
WEEK, WEEK * 2, WEEK * 10, WEEK * 60,
|
||||
];
|
||||
let mut i1 = 0;
|
||||
let bs = loop {
|
||||
if i1 >= thresholds.len() { break *thresholds.last().unwrap(); }
|
||||
let t = thresholds[i1];
|
||||
if bs < t { break t; }
|
||||
i1 += 1;
|
||||
};
|
||||
//info!("INPUT TS {} {}", ts1, ts2);
|
||||
//info!("chosen binsize: {} {}", i1, bs);
|
||||
let ts1 = ts1 / bs * bs;
|
||||
let ts2 = (ts2 + bs - 1) / bs * bs;
|
||||
//info!("ADJUSTED TS {} {}", ts1, ts2);
|
||||
BinSpecDimT {
|
||||
count,
|
||||
ts1,
|
||||
ts2,
|
||||
bs,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_range(&self, ix: u32) -> TimeRange {
|
||||
TimeRange {
|
||||
ts1: self.ts1 + ix as u64 * self.bs,
|
||||
ts2: self.ts1 + (ix as u64 + 1) * self.bs,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct TimeRange {
|
||||
ts1: u64,
|
||||
ts2: u64,
|
||||
}
|
||||
|
||||
|
||||
pub fn make_test_node(ix: u8) -> Node {
|
||||
Node {
|
||||
host: "localhost".into(),
|
||||
|
||||
64
disk/src/cache.rs
Normal file
64
disk/src/cache.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use err::Error;
|
||||
use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos};
|
||||
use futures_core::Stream;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct Query {
|
||||
range: NanoRange,
|
||||
agg_kind: AggKind,
|
||||
}
|
||||
|
||||
impl Query {
|
||||
|
||||
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
|
||||
let params = netpod::query_params(req.uri.query());
|
||||
let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?;
|
||||
let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?;
|
||||
let ret = Query {
|
||||
range: NanoRange {
|
||||
beg: beg_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
end: end_date.parse::<DateTime<Utc>>()?.to_nanos(),
|
||||
},
|
||||
agg_kind: AggKind::DimXBins1,
|
||||
};
|
||||
info!("Query::from_request {:?}", ret);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
pub struct BinParams {
|
||||
pub node: Node,
|
||||
pub cluster: Cluster,
|
||||
}
|
||||
|
||||
pub fn binned_bytes_for_http(params: BinParams) -> Result<BinnedBytesForHttpStream, Error> {
|
||||
|
||||
// TODO
|
||||
// Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches.
|
||||
|
||||
let ret = BinnedBytesForHttpStream {};
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub struct BinnedBytesForHttpStream {
|
||||
}
|
||||
|
||||
impl Stream for BinnedBytesForHttpStream {
|
||||
type Item = Result<Bytes, Error>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
// TODO
|
||||
use Poll::*;
|
||||
Ready(None)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,7 +1,3 @@
|
||||
pub mod agg;
|
||||
pub mod gen;
|
||||
pub mod merge;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{error, warn, info, debug, trace};
|
||||
use err::Error;
|
||||
@@ -18,6 +14,11 @@ use std::path::PathBuf;
|
||||
use bitshuffle::bitshuffle_decompress;
|
||||
use netpod::{ScalarType, Shape, Node, ChannelConfig};
|
||||
|
||||
pub mod agg;
|
||||
pub mod gen;
|
||||
pub mod merge;
|
||||
pub mod cache;
|
||||
|
||||
|
||||
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result<netpod::BodyStream, Error> {
|
||||
let path = datapath(query.timebin as u64, &query.channel_config, node);
|
||||
|
||||
Reference in New Issue
Block a user