From 8ca0e1d340c62cf6eea0930025dd79f9d9ec9965 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 29 Apr 2021 11:41:58 +0200 Subject: [PATCH] Refactor and remove unused code --- disk/src/agg.rs | 101 ------------------------------- disk/src/agg/binnedt.rs | 3 +- disk/src/agg/eventbatch.rs | 24 ++++---- disk/src/agg/scalarbinbatch.rs | 106 ++++++++++++++++++++++++--------- disk/src/binnedstream.rs | 11 +--- disk/src/cache.rs | 9 ++- disk/src/cache/pbv.rs | 39 +----------- netpod/src/lib.rs | 1 + retrieval/src/client.rs | 5 ++ 9 files changed, 104 insertions(+), 195 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 0d2284a..d36f3e9 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -40,46 +40,6 @@ pub trait AggregatableTdim { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; } -/// DO NOT USE. This is just a dummy for some testing. -impl AggregatableXdim1Bin for () { - type Output = (); - fn into_agg(self) -> Self::Output { - todo!() - } -} -/// DO NOT USE. This is just a dummy for some testing. -impl AggregatableTdim for () { - type Output = (); - type Aggregator = (); - fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() - } -} -/// DO NOT USE. This is just a dummy for some testing. -impl AggregatorTdim for () { - type InputValue = (); - type OutputValue = (); - - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ingest(&mut self, _v: &Self::InputValue) { - todo!() - } - fn result(self) -> Self::OutputValue { - todo!() - } -} - /// Batch of events with a scalar (zero dimensions) numeric value. pub struct ValuesDim0 { tss: Vec, @@ -217,67 +177,6 @@ pub trait FitsInside { fn fits_inside(&self, range: NanoRange) -> Fits; } -pub struct MinMaxAvgScalarBinSingle { - ts1: u64, - ts2: u64, - count: u64, - min: f32, - max: f32, - avg: f32, -} - -impl std::fmt::Debug for MinMaxAvgScalarBinSingle { - fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - fmt, - "MinMaxAvgScalarBinSingle ts1 {} ts2 {} count {} min {:7.2e} max {:7.2e} avg {:7.2e}", - self.ts1, self.ts2, self.count, self.min, self.max, self.avg - ) - } -} - -impl AggregatableTdim for MinMaxAvgScalarBinSingle { - type Output = MinMaxAvgScalarBinSingle; - type Aggregator = MinMaxAvgScalarBinSingleAggregator; - fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() - } -} - -impl AggregatableXdim1Bin for MinMaxAvgScalarBinSingle { - type Output = MinMaxAvgScalarBinSingle; - fn into_agg(self) -> Self::Output { - self - } -} - -pub struct MinMaxAvgScalarBinSingleAggregator {} - -impl AggregatorTdim for MinMaxAvgScalarBinSingleAggregator { - type InputValue = MinMaxAvgScalarBinSingle; - type OutputValue = MinMaxAvgScalarBinSingle; - - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() - } - - fn ingest(&mut self, _v: &Self::InputValue) { - todo!() - } - - fn result(self) -> Self::OutputValue { - todo!() - } -} - pub struct Dim0F32Stream where S: Stream>, diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 495b6cf..c4e09a1 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -71,7 +71,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; if self.completed { - panic!("MergedFromRemotes ✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗✗ poll_next on completed"); + panic!("IntoBinnedTDefaultStream poll_next on completed"); } if self.errored { self.completed = true; @@ -79,7 +79,6 @@ where } 'outer: loop { let cur = if let Some(k) = self.left.take() { - trace!("IntoBinnedTDefaultStream USE LEFTOVER"); k } else if self.inp_completed { Ready(None) diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 19b76e4..931745c 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -1,5 +1,5 @@ use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; -use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, MinMaxAvgScalarBinSingle}; +use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim}; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; use netpod::timeunits::SEC; @@ -159,25 +159,25 @@ impl MinMaxAvgScalarEventBatchAggregator { impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { type InputValue = MinMaxAvgScalarEventBatch; - type OutputValue = MinMaxAvgScalarBinSingle; + type OutputValue = MinMaxAvgScalarBinBatch; fn ends_before(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { - Some(ts) => *ts < self.ts1, + Some(&ts) => ts < self.ts1, None => true, } } fn ends_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.last() { - Some(ts) => *ts >= self.ts2, + Some(&ts) => ts >= self.ts2, _ => panic!(), } } fn starts_after(&self, inp: &Self::InputValue) -> bool { match inp.tss.first() { - Some(ts) => *ts >= self.ts2, + Some(&ts) => ts >= self.ts2, _ => panic!(), } } @@ -235,13 +235,13 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { } else { self.sum / self.count as f32 }; - MinMaxAvgScalarBinSingle { - ts1: self.ts1, - ts2: self.ts2, - count: self.count, - min, - max, - avg, + MinMaxAvgScalarBinBatch { + ts1s: vec![self.ts1], + ts2s: vec![self.ts2], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], } } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 18fb703..7b51605 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -1,4 +1,4 @@ -use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside, MinMaxAvgScalarBinSingle}; +use crate::agg::{AggregatableTdim, AggregatableXdim1Bin, AggregatorTdim, Fits, FitsInside}; use bytes::{BufMut, Bytes, BytesMut}; use netpod::log::*; use netpod::timeunits::SEC; @@ -9,12 +9,12 @@ use std::mem::size_of; #[allow(dead_code)] #[derive(Serialize, Deserialize)] pub struct MinMaxAvgScalarBinBatch { - ts1s: Vec, - ts2s: Vec, - counts: Vec, - mins: Vec, - maxs: Vec, - avgs: Vec, + pub ts1s: Vec, + pub ts2s: Vec, + pub counts: Vec, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, } impl MinMaxAvgScalarBinBatch { @@ -31,14 +31,6 @@ impl MinMaxAvgScalarBinBatch { pub fn len(&self) -> usize { self.ts1s.len() } - pub fn push_single(&mut self, g: &MinMaxAvgScalarBinSingle) { - self.ts1s.push(g.ts1); - self.ts2s.push(g.ts2); - self.counts.push(g.count); - self.mins.push(g.min); - self.maxs.push(g.max); - self.avgs.push(g.avg); - } pub fn from_full_frame(buf: &Bytes) -> Self { info!("MinMaxAvgScalarBinBatch construct from full frame len {}", buf.len()); assert!(buf.len() >= 4); @@ -191,36 +183,92 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { } impl AggregatableTdim for MinMaxAvgScalarBinBatch { - type Output = MinMaxAvgScalarBinSingle; + type Output = MinMaxAvgScalarBinBatch; type Aggregator = MinMaxAvgScalarBinBatchAggregator; - fn aggregator_new_static(_ts1: u64, _ts2: u64) -> Self::Aggregator { - todo!() + fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { + MinMaxAvgScalarBinBatchAggregator::new(ts1, ts2) } } -pub struct MinMaxAvgScalarBinBatchAggregator {} +pub struct MinMaxAvgScalarBinBatchAggregator { + ts1: u64, + ts2: u64, + count: u64, + min: f32, + max: f32, + sum: f32, +} + +impl MinMaxAvgScalarBinBatchAggregator { + pub fn new(ts1: u64, ts2: u64) -> Self { + Self { + ts1, + ts2, + min: f32::MAX, + max: f32::MIN, + sum: 0f32, + count: 0, + } + } +} impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { type InputValue = MinMaxAvgScalarBinBatch; - type OutputValue = MinMaxAvgScalarBinSingle; + type OutputValue = MinMaxAvgScalarBinBatch; - fn ends_before(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_before(&self, inp: &Self::InputValue) -> bool { + match inp.ts2s.last() { + Some(&ts) => ts <= self.ts1, + None => true, + } } - fn ends_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn ends_after(&self, inp: &Self::InputValue) -> bool { + match inp.ts2s.last() { + Some(&ts) => ts >= self.ts2, + _ => panic!(), + } } - fn starts_after(&self, _inp: &Self::InputValue) -> bool { - todo!() + fn starts_after(&self, inp: &Self::InputValue) -> bool { + match inp.ts1s.first() { + Some(&ts) => ts >= self.ts2, + _ => panic!(), + } } - fn ingest(&mut self, _v: &Self::InputValue) { - todo!() + fn ingest(&mut self, v: &Self::InputValue) { + for i1 in 0..v.ts1s.len() { + let ts1 = v.ts1s[i1]; + let ts2 = v.ts2s[i1]; + if ts2 <= self.ts1 { + continue; + } else if ts1 >= self.ts2 { + continue; + } else { + self.min = self.min.min(v.mins[i1]); + self.max = self.max.max(v.maxs[i1]); + self.sum += v.avgs[i1]; + self.count += 1; + } + } } fn result(self) -> Self::OutputValue { - todo!() + let min = if self.min == f32::MAX { f32::NAN } else { self.min }; + let max = if self.max == f32::MIN { f32::NAN } else { self.max }; + let avg = if self.count == 0 { + f32::NAN + } else { + self.sum / self.count as f32 + }; + MinMaxAvgScalarBinBatch { + ts1s: vec![self.ts1], + ts2s: vec![self.ts2], + counts: vec![self.count], + mins: vec![min], + maxs: vec![max], + avgs: vec![avg], + } } } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index d9232c5..5be1576 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -58,16 +58,7 @@ impl BinnedStream { } }) .map(|k| k) - .into_binned_t(range) - .map(|k| match k { - Ok(k) => { - // TODO instead of converting, let binner already return batches. - let mut ret = MinMaxAvgScalarBinBatch::empty(); - ret.push_single(&k); - Ok(ret) - } - Err(e) => Err(e), - }); + .into_binned_t(range); Self { inp: Box::pin(inp) } } } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 0a61ec2..830f7ed 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -73,13 +73,12 @@ pub async fn binned_bytes_for_http( let channel_config = read_local_config(&query.channel, node).await?; let entry = extract_matching_config_entry(range, &channel_config); info!("found config entry {:?}", entry); - let pre_range = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); - match pre_range { + let range = BinnedRange::covering_range(range.clone(), query.count) + .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?; + match PreBinnedPatchRange::covering_range(query.range.clone(), query.count) { Some(pre_range) => { info!("Found pre_range: {:?}", pre_range); - let range = BinnedRange::covering_range(range.clone(), query.count) - .ok_or(Error::with_msg(format!("BinnedRange::covering_range returned None")))?; - if range.grid_spec.bin_t_len() > pre_range.grid_spec.bin_t_len() { + if range.grid_spec.bin_t_len() < pre_range.grid_spec.bin_t_len() { let msg = format!( "binned_bytes_for_http incompatible ranges:\npre_range: {:?}\nrange: {:?}", pre_range, range diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 5458ce9..15f9f23 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -7,13 +7,13 @@ use crate::raw::EventsQuery; use bytes::Bytes; use err::Error; use futures_core::Stream; -use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::{ AggKind, BinnedRange, Channel, NanoRange, NodeConfig, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, }; -use std::future::{ready, Future}; +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -143,40 +143,7 @@ impl PreBinnedValueStream { } k }) - .into_binned_t(range) - .map_ok({ - let mut a = MinMaxAvgScalarBinBatch::empty(); - move |k| { - a.push_single(&k); - if a.len() > 0 { - let z = std::mem::replace(&mut a, MinMaxAvgScalarBinBatch::empty()); - Some(z) - } else { - None - } - } - }) - .filter_map(|k| { - let g = match k { - Ok(Some(k)) => Some(Ok(k)), - Ok(None) => None, - Err(e) => Some(Err(e)), - }; - ready(g) - }) - .take_while({ - let mut run = true; - move |k| { - if !run { - ready(false) - } else { - if k.is_err() { - run = false; - } - ready(true) - } - } - }); + .into_binned_t(range); self.fut2 = Some(Box::pin(s2)); } } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 43668a2..280a417 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -311,6 +311,7 @@ pub struct PreBinnedPatchRange { } impl PreBinnedPatchRange { + /// Cover at least the given range with at least as many as the requested number of bins. pub fn covering_range(range: NanoRange, min_bin_count: u64) -> Option { assert!(min_bin_count >= 1); assert!(min_bin_count <= 2000); diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 0916194..f54b0be 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use disk::frame::inmem::InMemoryFrameAsyncReadStream; use err::Error; use futures_util::TryStreamExt; +use http::StatusCode; use hyper::Body; use netpod::log::*; @@ -35,6 +36,10 @@ pub async fn get_binned( let client = hyper::Client::new(); let res = client.request(req).await?; info!("client response {:?}", res); + if res.status() != StatusCode::OK { + error!("Server error"); + return Err(Error::with_msg(format!("Server error"))); + } //let (res_head, mut res_body) = res.into_parts(); let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); let s2 = InMemoryFrameAsyncReadStream::new(s1);