WIP on client and clean up the binned range data structures
This commit is contained in:
@@ -405,7 +405,7 @@ where
|
||||
let ele_count = n1 / ty.bytes() as usize;
|
||||
let mut j = Vec::with_capacity(ele_count);
|
||||
let mut p1 = 0;
|
||||
for i1 in 0..ele_count {
|
||||
for _ in 0..ele_count {
|
||||
let u = unsafe {
|
||||
let mut r = [0u8; BY];
|
||||
std::ptr::copy_nonoverlapping(&decomp[p1], r.as_mut_ptr(), BY);
|
||||
|
||||
@@ -3,13 +3,13 @@ use err::Error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::BinSpecDimT;
|
||||
use netpod::{BinSpecDimT, BinnedRange};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub trait IntoBinnedT {
|
||||
type StreamOut: Stream;
|
||||
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut;
|
||||
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
|
||||
}
|
||||
|
||||
impl<T, I> IntoBinnedT for T
|
||||
@@ -20,7 +20,7 @@ where
|
||||
{
|
||||
type StreamOut = IntoBinnedTDefaultStream<T, I>;
|
||||
|
||||
fn into_binned_t(self, spec: BinSpecDimT) -> Self::StreamOut {
|
||||
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut {
|
||||
IntoBinnedTDefaultStream::new(self, spec)
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ where
|
||||
{
|
||||
inp: S,
|
||||
aggtor: Option<I::Aggregator>,
|
||||
spec: BinSpecDimT,
|
||||
spec: BinnedRange,
|
||||
curbin: u32,
|
||||
left: Option<Poll<Option<Result<I, Error>>>>,
|
||||
errored: bool,
|
||||
@@ -45,7 +45,7 @@ where
|
||||
I: AggregatableTdim,
|
||||
S: Stream<Item = Result<I, Error>>,
|
||||
{
|
||||
pub fn new(inp: S, spec: BinSpecDimT) -> Self {
|
||||
pub fn new(inp: S, spec: BinnedRange) -> Self {
|
||||
let range = spec.get_range(0);
|
||||
Self {
|
||||
inp,
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::agg::binnedx::IntoBinnedXBins1;
|
||||
use crate::agg::make_test_node;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{BinSpecDimT, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
|
||||
use netpod::{BinSpecDimT, BinnedRange, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape};
|
||||
use std::future::ready;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
@@ -66,7 +66,7 @@ async fn agg_x_dim_0_inner() {
|
||||
}
|
||||
k
|
||||
})
|
||||
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
|
||||
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap())
|
||||
.map(|k| {
|
||||
if false {
|
||||
trace!("after T binning {:?}", k.as_ref().unwrap());
|
||||
@@ -134,7 +134,7 @@ async fn agg_x_dim_1_inner() {
|
||||
//info!("after X binning {:?}", k.as_ref().unwrap());
|
||||
k
|
||||
})
|
||||
.into_binned_t(BinSpecDimT::over_range(bin_count, ts1, ts2))
|
||||
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap())
|
||||
.map(|k| {
|
||||
info!("after T binning {:?}", k.as_ref().unwrap());
|
||||
k
|
||||
|
||||
@@ -5,7 +5,7 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
#[allow(unused_imports)]
|
||||
use netpod::log::*;
|
||||
use netpod::{AggKind, Channel, NodeConfig, PreBinnedPatchIterator};
|
||||
use netpod::{AggKind, BinSpecDimT, BinnedRange, Channel, NodeConfig, PreBinnedPatchIterator};
|
||||
use netpod::{NanoRange, RetStreamExt};
|
||||
use std::future::ready;
|
||||
use std::pin::Pin;
|
||||
@@ -19,29 +19,30 @@ impl BinnedStream {
|
||||
pub fn new(
|
||||
patch_it: PreBinnedPatchIterator,
|
||||
channel: Channel,
|
||||
range: NanoRange,
|
||||
range: BinnedRange,
|
||||
agg_kind: AggKind,
|
||||
node_config: &NodeConfig,
|
||||
) -> Self {
|
||||
let patches: Vec<_> = patch_it.collect();
|
||||
warn!("BinnedStream will open a PreBinnedValueStream");
|
||||
warn!("BinnedStream::new");
|
||||
for p in &patches {
|
||||
info!("BinnedStream -> patch {:?}", p);
|
||||
info!("BinnedStream::new patch {:?}", p);
|
||||
}
|
||||
use super::agg::binnedt::IntoBinnedT;
|
||||
let inp = futures_util::stream::iter(patches.into_iter())
|
||||
.map({
|
||||
let node_config = node_config.clone();
|
||||
move |coord| PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), &node_config)
|
||||
})
|
||||
.flatten()
|
||||
.only_first_error()
|
||||
.filter_map({
|
||||
let range = range.clone();
|
||||
move |k: Result<MinMaxAvgScalarBinBatch, Error>| {
|
||||
let fit_range = range.full_range();
|
||||
let g = match k {
|
||||
Ok(k) => {
|
||||
use super::agg::{Fits, FitsInside};
|
||||
match k.fits_inside(range.clone()) {
|
||||
match k.fits_inside(fit_range) {
|
||||
Fits::Inside
|
||||
| Fits::PartlyGreater
|
||||
| Fits::PartlyLower
|
||||
@@ -56,6 +57,17 @@ impl BinnedStream {
|
||||
};
|
||||
ready(g)
|
||||
}
|
||||
})
|
||||
.map(|k| k)
|
||||
.into_binned_t(range)
|
||||
.map(|k| match k {
|
||||
Ok(k) => {
|
||||
// TODO instead of converting, let binner already return batches.
|
||||
let mut ret = MinMaxAvgScalarBinBatch::empty();
|
||||
ret.push_single(&k);
|
||||
Ok(ret)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
});
|
||||
Self { inp: Box::pin(inp) }
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use futures_core::Stream;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use hyper::Response;
|
||||
use netpod::{
|
||||
AggKind, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
|
||||
AggKind, BinnedRange, Channel, Cluster, NanoRange, Node, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
|
||||
PreBinnedPatchRange, ToNanos,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -73,15 +73,24 @@ pub async fn binned_bytes_for_http(
|
||||
let channel_config = read_local_config(&query.channel, node).await?;
|
||||
let entry = extract_matching_config_entry(range, &channel_config);
|
||||
info!("found config entry {:?}", entry);
|
||||
let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count);
|
||||
match grid {
|
||||
Some(spec) => {
|
||||
info!("GOT PreBinnedPatchGridSpec: {:?}", spec);
|
||||
warn!("Pass here to BinnedStream what kind of Agg, range, ...");
|
||||
let pre_range = PreBinnedPatchRange::covering_range(query.range.clone(), query.count);
|
||||
match pre_range {
|
||||
Some(pre_range) => {
|
||||
info!("Found pre_range: {:?}", pre_range);
|
||||
let range = BinnedRange::covering_range(range.clone(), query.count)
|
||||
.ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?;
|
||||
if range.grid_spec.bin_t_len() > pre_range.grid_spec.bin_t_len() {
|
||||
let msg = format!(
|
||||
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
|
||||
pre_range, range
|
||||
);
|
||||
error!("{}", msg);
|
||||
return Err(Error::with_msg(msg));
|
||||
}
|
||||
let s1 = BinnedStream::new(
|
||||
PreBinnedPatchIterator::from_range(spec),
|
||||
PreBinnedPatchIterator::from_range(pre_range),
|
||||
query.channel.clone(),
|
||||
query.range.clone(),
|
||||
range,
|
||||
query.agg_kind.clone(),
|
||||
node_config,
|
||||
);
|
||||
@@ -89,7 +98,7 @@ pub async fn binned_bytes_for_http(
|
||||
Ok(ret)
|
||||
}
|
||||
None => {
|
||||
// Merge raw data
|
||||
// TODO Merge raw data.
|
||||
error!("binned_bytes_for_http TODO merge raw data");
|
||||
todo!()
|
||||
}
|
||||
@@ -153,8 +162,10 @@ pub struct PreBinnedQuery {
|
||||
impl PreBinnedQuery {
|
||||
pub fn from_request(req: &http::request::Parts) -> Result<Self, Error> {
|
||||
let params = netpod::query_params(req.uri.query());
|
||||
let patch_ix = params.get("patch_ix").unwrap().parse().unwrap();
|
||||
let bin_t_len = params.get("bin_t_len").unwrap().parse().unwrap();
|
||||
let ret = PreBinnedQuery {
|
||||
patch: PreBinnedPatchCoord::from_query_params(¶ms),
|
||||
patch: PreBinnedPatchCoord::new(bin_t_len, patch_ix),
|
||||
agg_kind: AggKind::DimXBins1,
|
||||
channel: Channel {
|
||||
backend: params.get("channel_backend").unwrap().into(),
|
||||
|
||||
7
disk/src/cache/pbv.rs
vendored
7
disk/src/cache/pbv.rs
vendored
@@ -10,7 +10,7 @@ use futures_core::Stream;
|
||||
use futures_util::{FutureExt, StreamExt, TryStreamExt};
|
||||
use netpod::log::*;
|
||||
use netpod::{
|
||||
AggKind, BinSpecDimT, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
|
||||
AggKind, BinSpecDimT, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
|
||||
PreBinnedPatchRange,
|
||||
};
|
||||
use std::future::{ready, Future};
|
||||
@@ -135,12 +135,13 @@ impl PreBinnedValueStream {
|
||||
// TODO use a ctor, remove from BinSpecDimT the redundant variable.
|
||||
// If given a timestamp range, verify that it divides.
|
||||
// For ranges, use a range type.
|
||||
let spec = BinSpecDimT {
|
||||
let _spec = BinSpecDimT {
|
||||
bs: self.patch_coord.bin_t_len(),
|
||||
ts1: self.patch_coord.patch_beg(),
|
||||
ts2: self.patch_coord.patch_end(),
|
||||
count,
|
||||
};
|
||||
let range = BinnedRange::covering_range(evq.range.clone(), count).unwrap();
|
||||
let s1 = MergedFromRemotes::new(evq, self.node_config.cluster.clone());
|
||||
let s2 = s1
|
||||
.map(|k| {
|
||||
@@ -151,7 +152,7 @@ impl PreBinnedValueStream {
|
||||
}
|
||||
k
|
||||
})
|
||||
.into_binned_t(spec)
|
||||
.into_binned_t(range)
|
||||
.map_ok({
|
||||
let mut a = MinMaxAvgScalarBinBatch::empty();
|
||||
move |k| {
|
||||
|
||||
11
disk/src/cache/pbvfs.rs
vendored
11
disk/src/cache/pbvfs.rs
vendored
@@ -15,6 +15,8 @@ pub struct PreBinnedValueFetchedStream {
|
||||
uri: http::Uri,
|
||||
resfut: Option<hyper::client::ResponseFuture>,
|
||||
res: Option<InMemoryFrameAsyncReadStream<HttpBodyAsAsyncRead>>,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
}
|
||||
|
||||
impl PreBinnedValueFetchedStream {
|
||||
@@ -44,6 +46,8 @@ impl PreBinnedValueFetchedStream {
|
||||
uri,
|
||||
resfut: None,
|
||||
res: None,
|
||||
errored: false,
|
||||
completed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -58,6 +62,13 @@ impl Stream for PreBinnedValueFetchedStream {
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
if self.completed {
|
||||
panic!("poll_next on completed");
|
||||
}
|
||||
if self.errored {
|
||||
self.completed = true;
|
||||
return Ready(None);
|
||||
}
|
||||
'outer: loop {
|
||||
break if let Some(res) = self.res.as_mut() {
|
||||
pin_mut!(res);
|
||||
|
||||
Reference in New Issue
Block a user