From 19ef985625b4533a72b670a378ab8b2d04fdf00c Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 9 Jun 2021 20:21:24 +0200 Subject: [PATCH] Remove traits EventsTimeBinner and BinsTimeBinner --- disk/src/agg.rs | 1 - disk/src/agg/binnedt.rs | 1 - disk/src/agg/binnedt4.rs | 120 +------------------------------ disk/src/binned.rs | 70 +----------------- disk/src/binned/binnedfrompbv.rs | 2 +- disk/src/binned/pbv.rs | 4 +- disk/src/binned/prebinned.rs | 9 +-- 7 files changed, 8 insertions(+), 199 deletions(-) delete mode 100644 disk/src/agg/binnedt.rs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 990927c..9eb9890 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -17,7 +17,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -pub mod binnedt; pub mod binnedt2; pub mod binnedt3; pub mod binnedt4; diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs deleted file mode 100644 index 8b13789..0000000 --- a/disk/src/agg/binnedt.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/disk/src/agg/binnedt4.rs b/disk/src/agg/binnedt4.rs index affb846..d9f4912 100644 --- a/disk/src/agg/binnedt4.rs +++ b/disk/src/agg/binnedt4.rs @@ -3,8 +3,8 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::{ - BinsTimeBinner, EventsTimeBinner, EventsTimeBinnerAggregator, FilterFittingInside, MinMaxAvgAggregator, - MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, SingleXBinAggregator, + FilterFittingInside, MinMaxAvgAggregator, MinMaxAvgBins, NumOps, RangeCompletableItem, RangeOverlapInfo, ReadPbv, + ReadableFromFile, SingleXBinAggregator, }; use crate::decode::{EventValues, MinMaxAvgScalarEventBatchGen}; use crate::frame::makeframe::Framable; @@ -21,131 +21,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::fs::File; -// TODO no longer needed -pub struct DefaultScalarEventsTimeBinner { - _m1: PhantomData, -} - -impl EventsTimeBinner for DefaultScalarEventsTimeBinner -where - NTY: NumOps, -{ - type Input = EventValues; - type Output = MinMaxAvgBins; - type Aggregator = MinMaxAvgAggregator; - - fn aggregator(range: NanoRange) -> Self::Aggregator { - Self::Aggregator::new(range) - } -} - -// TODO no longer needed -pub struct DefaultSingleXBinTimeBinner { - _m1: PhantomData, -} - -impl EventsTimeBinner for DefaultSingleXBinTimeBinner -where - NTY: NumOps, -{ - type Input = XBinnedScalarEvents; - // TODO is that output type good enough for now? Maybe better with a new one also - // to distinguish from the earlier one. - type Output = MinMaxAvgBins; - type Aggregator = SingleXBinAggregator; - - fn aggregator(range: NanoRange) -> Self::Aggregator { - Self::Aggregator::new(range) - } -} - pub struct DefaultBinsTimeBinner { _m1: PhantomData, } -impl BinsTimeBinner for DefaultBinsTimeBinner -where - NTY: NumOps, -{ - type Input = MinMaxAvgBins; - type Output = MinMaxAvgBins; - - fn process(inp: Self::Input) -> Self::Output { - todo!() - } -} - pub trait Aggregator3Tdim { type InputValue; type OutputValue; } -pub struct Agg3 { - range: NanoRange, - count: u64, - min: f32, - max: f32, - sum: f32, - sumc: u64, -} - -impl Agg3 { - fn new(range: NanoRange) -> Self { - Self { - range, - count: 0, - min: f32::MAX, - max: f32::MIN, - sum: f32::NAN, - sumc: 0, - } - } - - fn ingest(&mut self, item: &MinMaxAvgScalarEventBatch) { - for i1 in 0..item.tss.len() { - let ts = item.tss[i1]; - if ts < self.range.beg { - continue; - } else if ts >= self.range.end { - continue; - } else { - self.count += 1; - self.min = self.min.min(item.mins[i1]); - self.max = self.max.max(item.maxs[i1]); - let x = item.avgs[i1]; - if x.is_nan() { - } else { - if self.sum.is_nan() { - self.sum = x; - } else { - self.sum += x; - } - self.sumc += 1; - } - } - } - } - - fn result(self) -> Vec { - let min = if self.min == f32::MAX { f32::NAN } else { self.min }; - let max = if self.max == f32::MIN { f32::NAN } else { self.max }; - let avg = if self.sumc == 0 { - f32::NAN - } else { - self.sum / self.sumc as f32 - }; - let v = MinMaxAvgScalarBinBatch { - ts1s: vec![self.range.beg], - ts2s: vec![self.range.end], - counts: vec![self.count], - mins: vec![min], - maxs: vec![max], - avgs: vec![avg], - }; - vec![v] - } -} - pub trait TimeBinnableTypeAggregator: Send { type Input: TimeBinnableType; type Output: TimeBinnableType; diff --git a/disk/src/binned.rs b/disk/src/binned.rs index 2fe0bf6..e5b8dc6 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -1,9 +1,6 @@ use crate::agg::binnedt2::AggregatableTdim2; use crate::agg::binnedt3::{Agg3, BinnedT3Stream}; -use crate::agg::binnedt4::{ - DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TBinnerStream, TimeBinnableType, - TimeBinnableTypeAggregator, -}; +use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::enp::{Identity, WaveXBinner, XBinnedScalarEvents}; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; @@ -770,29 +767,6 @@ pub trait TimeBins: Send + Unpin + WithLen + Appendable + FilterFittingInside { fn ts2s(&self) -> &Vec; } -// TODO remove in favor of the one in binnedt4 -pub trait EventsTimeBinner: Send + Unpin { - type Input: Unpin + RangeOverlapInfo; - type Output: TimeBins; - type Aggregator: EventsTimeBinnerAggregator + Unpin; - fn aggregator(range: NanoRange) -> Self::Aggregator; -} - -// TODO remove in favor of the one in binnedt4 -pub trait EventsTimeBinnerAggregator: Send { - type Input: Unpin; - type Output: Unpin; - fn range(&self) -> &NanoRange; - fn ingest(&mut self, item: &Self::Input); - fn result(self) -> Self::Output; -} - -pub trait BinsTimeBinner { - type Input: TimeBins; - type Output: TimeBins; - fn process(inp: Self::Input) -> Self::Output; -} - #[derive(Clone, Serialize, Deserialize)] pub struct MinMaxAvgBins { pub ts1s: Vec, @@ -1120,27 +1094,6 @@ where } } -// TODO after refactor get rid of this impl: -impl EventsTimeBinnerAggregator for MinMaxAvgAggregator -where - NTY: NumOps, -{ - type Input = EventValues; - type Output = MinMaxAvgBins; - - fn range(&self) -> &NanoRange { - &self.range - } - - fn ingest(&mut self, item: &Self::Input) { - todo!() - } - - fn result(self) -> Self::Output { - todo!() - } -} - pub struct MinMaxAvgBinsAggregator { range: NanoRange, count: u32, @@ -1262,27 +1215,6 @@ impl SingleXBinAggregator { } } -impl EventsTimeBinnerAggregator for SingleXBinAggregator -where - NTY: NumOps, -{ - type Input = XBinnedScalarEvents; - // TODO do I need another type to carry the x-bin count as well? No xbincount is static anyways. - type Output = MinMaxAvgBins; - - fn range(&self) -> &NanoRange { - &self.range - } - - fn ingest(&mut self, item: &Self::Input) { - todo!() - } - - fn result(self) -> Self::Output { - todo!() - } -} - pub trait StreamKind: Clone + Unpin + Send + Sync + 'static { type TBinnedStreamType: Stream>, Error>> + Send; type XBinnedEvents: XBinnedEvents; diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 817f78a..f9332df 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -1,7 +1,7 @@ use crate::agg::binnedt4::{TBinnerStream, TimeBinnableType, TimeBinnableTypeAggregator}; use crate::agg::streams::StreamItem; use crate::binned::query::{CacheUsage, PreBinnedQuery}; -use crate::binned::{EventsTimeBinner, RangeCompletableItem}; +use crate::binned::RangeCompletableItem; use crate::cache::{node_ix_for_patch, HttpBodyAsAsyncRead}; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, FrameType}; diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index aa3afcd..8918448 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -3,8 +3,8 @@ use crate::agg::streams::{Appendable, StreamItem}; use crate::binned::binnedfrompbv::FetchedPreBinned; use crate::binned::query::{CacheUsage, PreBinnedQuery}; use crate::binned::{ - BinnedStreamKindScalar, BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, - RangeCompletableItem, ReadableFromFile, StreamKind, WithLen, + BinnedStreamKindScalar, EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, + StreamKind, WithLen, }; use crate::cache::pbvfs::PreBinnedScalarValueFetchedStream; use crate::cache::{write_pb_cache_min_max_avg_scalar, CacheFileDesc, MergedFromRemotes, WrittenPbCache}; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index fcfb2a6..afbcc7d 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -1,6 +1,4 @@ -use crate::agg::binnedt4::{ - DefaultBinsTimeBinner, DefaultScalarEventsTimeBinner, DefaultSingleXBinTimeBinner, TimeBinnableType, -}; +use crate::agg::binnedt4::{DefaultBinsTimeBinner, TimeBinnableType}; use crate::agg::enp::{Identity, WaveXBinner}; use crate::agg::streams::{Appendable, StreamItem}; // use crate::binned::pbv2::{ @@ -8,10 +6,7 @@ use crate::agg::streams::{Appendable, StreamItem}; // }; use crate::binned::pbv::PreBinnedValueStream; use crate::binned::query::PreBinnedQuery; -use crate::binned::{ - BinsTimeBinner, EventsNodeProcessor, EventsTimeBinner, NumOps, PushableIndex, RangeCompletableItem, - ReadableFromFile, StreamKind, -}; +use crate::binned::{EventsNodeProcessor, NumOps, PushableIndex, RangeCompletableItem, ReadableFromFile, StreamKind}; use crate::cache::node_ix_for_patch; use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case,