Remove AggregatableXdim1Bin

This commit is contained in:
Dominik Werder
2021-06-09 20:54:53 +02:00
parent 38e17d440b
commit 9f90774328
3 changed files with 1 additions and 131 deletions

View File

@@ -23,14 +23,6 @@ pub mod eventbatch;
pub mod scalarbinbatch;
pub mod streams;
pub trait AggregatableXdim1Bin<SK>
where
SK: StreamKind,
{
type Output: AggregatableXdim1Bin<SK>;
fn into_agg(self) -> Self::Output;
}
/// Batch of events with a scalar (zero dimensions) numeric value.
pub struct ValuesDim0 {
tss: Vec<u64>,
@@ -49,60 +41,6 @@ impl std::fmt::Debug for ValuesDim0 {
}
}
// TODO get rid of AggregatableXdim1Bin and ValuesDim1
impl<SK> AggregatableXdim1Bin<SK> for ValuesDim1
where
SK: StreamKind,
{
type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output {
let mut ret = MinMaxAvgScalarEventBatch {
tss: Vec::with_capacity(self.tss.len()),
mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()),
};
for i1 in 0..self.tss.len() {
let ts = self.tss[i1];
let mut min = f32::MAX;
let mut max = f32::MIN;
let mut sum = f32::NAN;
let mut count = 0;
let vals = &self.values[i1];
for i2 in 0..vals.len() {
let v = vals[i2];
min = min.min(v);
max = max.max(v);
if v.is_nan() {
} else {
if sum.is_nan() {
sum = v;
} else {
sum += v;
}
count += 1;
}
}
if min == f32::MAX {
min = f32::NAN;
}
if max == f32::MIN {
max = f32::NAN;
}
ret.tss.push(ts);
ret.mins.push(min);
ret.maxs.push(max);
if sum.is_nan() {
ret.avgs.push(sum);
} else {
ret.avgs.push(sum / count as f32);
}
}
ret
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ValuesExtractStats {
pub dur: Duration,
@@ -147,53 +85,6 @@ impl std::fmt::Debug for ValuesDim1 {
}
}
impl<SK> AggregatableXdim1Bin<SK> for ValuesDim0
where
SK: StreamKind,
{
type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output {
let mut ret = MinMaxAvgScalarEventBatch {
tss: Vec::with_capacity(self.tss.len()),
mins: Vec::with_capacity(self.tss.len()),
maxs: Vec::with_capacity(self.tss.len()),
avgs: Vec::with_capacity(self.tss.len()),
};
// TODO stats are not yet in ValuesDim0
err::todoval::<u32>();
//if self.range_complete_observed {
// ret.range_complete_observed = true;
//}
for i1 in 0..self.tss.len() {
let ts = self.tss[i1];
let mut min = f32::MAX;
let mut max = f32::MIN;
let mut sum = 0f32;
let vals = &self.values[i1];
assert!(vals.len() > 0);
for i2 in 0..vals.len() {
let v = vals[i2];
//info!("value {} {} {}", i1, i2, v);
min = min.min(v);
max = max.max(v);
sum += v;
}
if min == f32::MAX {
min = f32::NAN;
}
if max == f32::MIN {
max = f32::NAN;
}
ret.tss.push(ts);
ret.mins.push(min);
ret.maxs.push(max);
ret.avgs.push(sum / vals.len() as f32);
}
ret
}
}
pub enum Fits {
Empty,
Lower,

View File

@@ -1,6 +1,5 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Appendable, StreamItem};
use crate::agg::AggregatableXdim1Bin;
use crate::binned::{MakeBytesFrame, RangeCompletableItem, RangeOverlapInfo, StreamKind};
use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut};
@@ -100,16 +99,6 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
}
}
impl<SK> AggregatableXdim1Bin<SK> for MinMaxAvgScalarEventBatch
where
SK: StreamKind,
{
type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output {
self
}
}
impl MinMaxAvgScalarEventBatch {
#[allow(dead_code)]
fn old_serialized(&self) -> Bytes {

View File

@@ -1,5 +1,5 @@
use crate::agg::streams::{Appendable, StreamItem, ToJsonBytes};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::agg::{Fits, FitsInside};
use crate::binned::{MakeBytesFrame, RangeCompletableItem, StreamKind};
use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut};
@@ -184,16 +184,6 @@ impl MinMaxAvgScalarBinBatch {
}
}
impl<SK> AggregatableXdim1Bin<SK> for MinMaxAvgScalarBinBatch
where
SK: StreamKind,
{
type Output = MinMaxAvgScalarBinBatch;
fn into_agg(self) -> Self::Output {
todo!()
}
}
pub struct MinMaxAvgScalarBinBatchAggregator {
ts1: u64,
ts2: u64,