Refactor and remove unused code

This commit is contained in:
Dominik Werder
2021-04-29 11:41:58 +02:00
parent 79fbaafe2e
commit 8ca0e1d340
9 changed files with 104 additions and 195 deletions

View File

@@ -40,46 +40,6 @@ pub trait AggregatableTdim {
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
} }
/// DO NOT USE. This is just a dummy for some testing.
impl AggregatableXdim1Bin for () {
type Output = ();
fn into_agg(self) -> Self::Output {
todo!()
}
}
/// DO NOT USE. This is just a dummy for some testing.
impl AggregatableTdim for () {
type Output = ();
type Aggregator = ();
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
/// DO NOT USE. This is just a dummy for some testing.
impl AggregatorTdim for () {
type InputValue = ();
type OutputValue = ();
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue {
todo!()
}
}
/// Batch of events with a scalar (zero dimensions) numeric value. /// Batch of events with a scalar (zero dimensions) numeric value.
pub struct ValuesDim0 { pub struct ValuesDim0 {
tss: Vec<u64>, tss: Vec<u64>,
@@ -217,67 +177,6 @@ pub trait FitsInside {
fn fits_inside(&self, range: NanoRange) -> Fits; fn fits_inside(&self, range: NanoRange) -> Fits;
} }
pub struct MinMaxAvgScalarBinSingle {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
avg: f32,
}
impl std::fmt::Debug for MinMaxAvgScalarBinSingle {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
fmt,
"MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}",
self.ts1, self.ts2, self.count, self.min, self.max, self.avg
)
}
}
impl AggregatableTdim for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle;
type Aggregator = MinMaxAvgScalarBinSingleAggregator;
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator {
todo!()
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle {
type Output = MinMaxAvgScalarBinSingle;
fn into_agg(self) -> Self::Output {
self
}
}
pub struct MinMaxAvgScalarBinSingleAggregator {}
impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator {
type InputValue = MinMaxAvgScalarBinSingle;
type OutputValue = MinMaxAvgScalarBinSingle;
fn ends_before(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, _inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, _v: &Self::InputValue) {
todo!()
}
fn result(self) -> Self::OutputValue {
todo!()
}
}
pub struct Dim0F32Stream<S> pub struct Dim0F32Stream<S>
where where
S: Stream<Item = Result<EventFull, Error>>, S: Stream<Item = Result<EventFull, Error>>,

View File

@@ -71,7 +71,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
if self.completed { if self.completed {
panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); panic!("IntoBinnedTDefaultStream poll_next on completed");
} }
if self.errored { if self.errored {
self.completed = true; self.completed = true;
@@ -79,7 +79,6 @@ where
} }
'outer: loop { 'outer: loop {
let cur = if let Some(k) = self.left.take() { let cur = if let Some(k) = self.left.take() {
trace!("IntoBinnedTDefaultStream USE LEFTOVER");
k k
} else if self.inp_completed { } else if self.inp_completed {
Ready(None) Ready(None)

View File

@@ -1,5 +1,5 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, MinMaxAvgScalarBinSingle}; use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
@@ -159,25 +159,25 @@ impl MinMaxAvgScalarEventBatchAggregator {
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
type InputValue = MinMaxAvgScalarEventBatch; type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinBatch;
fn ends_before(&self, inp: &Self::InputValue) -> bool { fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() { match inp.tss.last() {
Some(ts) => *ts < self.ts1, Some(&ts) => ts < self.ts1,
None => true, None => true,
} }
} }
fn ends_after(&self, inp: &Self::InputValue) -> bool { fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.last() { match inp.tss.last() {
Some(ts) => *ts >= self.ts2, Some(&ts) => ts >= self.ts2,
_ => panic!(), _ => panic!(),
} }
} }
fn starts_after(&self, inp: &Self::InputValue) -> bool { fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp.tss.first() { match inp.tss.first() {
Some(ts) => *ts >= self.ts2, Some(&ts) => ts >= self.ts2,
_ => panic!(), _ => panic!(),
} }
} }
@@ -235,13 +235,13 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
} else { } else {
self.sum / self.count as f32 self.sum / self.count as f32
}; };
MinMaxAvgScalarBinSingle { MinMaxAvgScalarBinBatch {
ts1: self.ts1, ts1s: vec![self.ts1],
ts2: self.ts2, ts2s: vec![self.ts2],
count: self.count, counts: vec![self.count],
min, mins: vec![min],
max, maxs: vec![max],
avg, avgs: vec![avg],
} }
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, MinMaxAvgScalarBinSingle}; use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*; use netpod::log::*;
use netpod::timeunits::SEC; use netpod::timeunits::SEC;
@@ -9,12 +9,12 @@ use std::mem::size_of;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct MinMaxAvgScalarBinBatch { pub struct MinMaxAvgScalarBinBatch {
ts1s: Vec<u64>, pub ts1s: Vec<u64>,
ts2s: Vec<u64>, pub ts2s: Vec<u64>,
counts: Vec<u64>, pub counts: Vec<u64>,
mins: Vec<f32>, pub mins: Vec<f32>,
maxs: Vec<f32>, pub maxs: Vec<f32>,
avgs: Vec<f32>, pub avgs: Vec<f32>,
} }
impl MinMaxAvgScalarBinBatch { impl MinMaxAvgScalarBinBatch {
@@ -31,14 +31,6 @@ impl MinMaxAvgScalarBinBatch {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.ts1s.len() self.ts1s.len()
} }
pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) {
self.ts1s.push(g.ts1);
self.ts2s.push(g.ts2);
self.counts.push(g.count);
self.mins.push(g.min);
self.maxs.push(g.max);
self.avgs.push(g.avg);
}
pub fn from_full_frame(buf: &Bytes) -> Self { pub fn from_full_frame(buf: &Bytes) -> Self {
info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len()); info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len());
assert!(buf.len() >= 4); assert!(buf.len() >= 4);
@@ -191,36 +183,92 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch {
} }
impl AggregatableTdim for MinMaxAvgScalarBinBatch { impl AggregatableTdim for MinMaxAvgScalarBinBatch {
type Output = MinMaxAvgScalarBinSingle; type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarBinBatchAggregator; type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
todo!() MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2)
} }
} }
pub struct MinMaxAvgScalarBinBatchAggregator {} pub struct MinMaxAvgScalarBinBatchAggregator {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
sum: f32,
}
impl MinMaxAvgScalarBinBatchAggregator {
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
ts1,
ts2,
min: f32::MAX,
max: f32::MIN,
sum: 0f32,
count: 0,
}
}
}
impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
type InputValue = MinMaxAvgScalarBinBatch; type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinSingle; type OutputValue = MinMaxAvgScalarBinBatch;
fn ends_before(&self, _inp: &Self::InputValue) -> bool { fn ends_before(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts2s.last() {
Some(&ts) => ts <= self.ts1,
None => true,
}
} }
fn ends_after(&self, _inp: &Self::InputValue) -> bool { fn ends_after(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts2s.last() {
Some(&ts) => ts >= self.ts2,
_ => panic!(),
}
} }
fn starts_after(&self, _inp: &Self::InputValue) -> bool { fn starts_after(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts1s.first() {
Some(&ts) => ts >= self.ts2,
_ => panic!(),
}
} }
fn ingest(&mut self, _v: &Self::InputValue) { fn ingest(&mut self, v: &Self::InputValue) {
todo!() for i1 in 0..v.ts1s.len() {
let ts1 = v.ts1s[i1];
let ts2 = v.ts2s[i1];
if ts2 <= self.ts1 {
continue;
} else if ts1 >= self.ts2 {
continue;
} else {
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
self.sum += v.avgs[i1];
self.count += 1;
}
}
} }
fn result(self) -> Self::OutputValue { fn result(self) -> Self::OutputValue {
todo!() let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.count == 0 {
f32::NAN
} else {
self.sum / self.count as f32
};
MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],
ts2s: vec![self.ts2],
counts: vec![self.count],
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
}
} }
} }

View File

@@ -58,16 +58,7 @@ impl BinnedStream {
} }
}) })
.map(|k| k) .map(|k| k)
.into_binned_t(range) .into_binned_t(range);
.map(|k| match k {
Ok(k) => {
// TODO instead of converting, let binner already return batches.
let mut ret = MinMaxAvgScalarBinBatch::empty();
ret.push_single(&k);
Ok(ret)
}
Err(e) => Err(e),
});
Self { inp: Box::pin(inp) } Self { inp: Box::pin(inp) }
} }
} }

View File

@@ -73,13 +73,12 @@ pub async fn binned_bytes_for_http(
let channel_config = read_local_config(&query.channel, node).await?; let channel_config = read_local_config(&query.channel, node).await?;
let entry = extract_matching_config_entry(range, &channel_config); let entry = extract_matching_config_entry(range, &channel_config);
info!("found config entry {:?}", entry); info!("found config entry {:?}", entry);
let pre_range = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); let range = BinnedRange::covering_range(range.clone(), query.count)
match pre_range { .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?;
match PreBinnedPatchRange::covering_range(query.range.clone(), query.count) {
Some(pre_range) => { Some(pre_range) => {
info!("Found pre_range: {:?}", pre_range); info!("Found pre_range: {:?}", pre_range);
let range = BinnedRange::covering_range(range.clone(), query.count) if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() {
.ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?;
if range.grid_spec.bin_t_len() > pre_range.grid_spec.bin_t_len() {
let msg = format!( let msg = format!(
"binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}",
pre_range, range pre_range, range

39
disk/src/cache/pbv.rs vendored
View File

@@ -7,13 +7,13 @@ use crate::raw::EventsQuery;
use bytes::Bytes; use bytes::Bytes;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::{FutureExt, StreamExt, TryStreamExt}; use futures_util::{FutureExt, StreamExt};
use netpod::log::*; use netpod::log::*;
use netpod::{ use netpod::{
AggKind, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, AggKind, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator,
PreBinnedPatchRange, PreBinnedPatchRange,
}; };
use std::future::{ready, Future}; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -143,40 +143,7 @@ impl PreBinnedValueStream {
} }
k k
}) })
.into_binned_t(range) .into_binned_t(range);
.map_ok({
let mut a = MinMaxAvgScalarBinBatch::empty();
move |k| {
a.push_single(&k);
if a.len() > 0 {
let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty());
Some(z)
} else {
None
}
}
})
.filter_map(|k| {
let g = match k {
Ok(Some(k)) => Some(Ok(k)),
Ok(None) => None,
Err(e) => Some(Err(e)),
};
ready(g)
})
.take_while({
let mut run = true;
move |k| {
if !run {
ready(false)
} else {
if k.is_err() {
run = false;
}
ready(true)
}
}
});
self.fut2 = Some(Box::pin(s2)); self.fut2 = Some(Box::pin(s2));
} }
} }

View File

@@ -311,6 +311,7 @@ pub struct PreBinnedPatchRange {
} }
impl PreBinnedPatchRange { impl PreBinnedPatchRange {
/// Cover at least the given range with at least as many as the requested number of bins.
pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> { pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option<Self> {
assert!(min_bin_count >= 1); assert!(min_bin_count >= 1);
assert!(min_bin_count <= 2000); assert!(min_bin_count <= 2000);

View File

@@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
use disk::frame::inmem::InMemoryFrameAsyncReadStream; use disk::frame::inmem::InMemoryFrameAsyncReadStream;
use err::Error; use err::Error;
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use http::StatusCode;
use hyper::Body; use hyper::Body;
use netpod::log::*; use netpod::log::*;
@@ -35,6 +36,10 @@ pub async fn get_binned(
let client = hyper::Client::new(); let client = hyper::Client::new();
let res = client.request(req).await?; let res = client.request(req).await?;
info!("client response {:?}", res); info!("client response {:?}", res);
if res.status() != StatusCode::OK {
error!("Server error");
return Err(Error::with_msg(format!("Server error")));
}
//let (res_head, mut res_body) = res.into_parts(); //let (res_head, mut res_body) = res.into_parts();
let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);
let s2 = InMemoryFrameAsyncReadStream::new(s1); let s2 = InMemoryFrameAsyncReadStream::new(s1);