This commit is contained in:
Dominik Werder
2021-06-14 23:10:14 +02:00
parent bb6d853b78
commit bebce14f56
9 changed files with 881 additions and 67 deletions
+6 -4
View File
@@ -28,7 +28,7 @@ pub trait TimeBinnableType:
{
type Output: TimeBinnableType;
type Aggregator: TimeBinnableTypeAggregator<Input = Self, Output = Self::Output> + Send + Unpin;
fn aggregator(range: NanoRange) -> Self::Aggregator;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator;
}
pub struct TBinnerStream<S, TBT>
@@ -38,6 +38,7 @@ where
{
inp: Pin<Box<S>>,
spec: BinnedRange,
bin_count: usize,
curbin: u32,
left: Option<Poll<Option<Sitemty<TBT>>>>,
aggtor: Option<<TBT as TimeBinnableType>::Aggregator>,
@@ -55,14 +56,15 @@ where
S: Stream<Item = Sitemty<TBT>> + Send + Unpin + 'static,
TBT: TimeBinnableType,
{
pub fn new(inp: S, spec: BinnedRange) -> Self {
pub fn new(inp: S, spec: BinnedRange, bin_count: usize) -> Self {
let range = spec.get_range(0);
Self {
inp: Box::pin(inp),
spec,
bin_count,
curbin: 0,
left: None,
aggtor: Some(<TBT as TimeBinnableType>::aggregator(range)),
aggtor: Some(<TBT as TimeBinnableType>::aggregator(range, bin_count)),
tmp_agg_results: VecDeque::new(),
inp_completed: false,
all_bins_emitted: false,
@@ -90,7 +92,7 @@ where
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(<TBT as TimeBinnableType>::aggregator(range))
.replace(<TBT as TimeBinnableType>::aggregator(range, self.bin_count))
.unwrap()
.result();
// TODO should we accumulate bins before emit? Maybe not, we want to stay responsive.
+317 -16
View File
@@ -3,12 +3,12 @@ use crate::agg::streams::Appendable;
use crate::agg::{Fits, FitsInside};
use crate::binned::dim1::MinMaxAvgDim1Bins;
use crate::binned::{
EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeOverlapInfo, ReadPbv,
ReadableFromFile, WithLen, WithTimestamps,
EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, MinMaxAvgWaveBins, NumOps, PushableIndex,
RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps,
};
use crate::decode::EventValues;
use err::Error;
use netpod::NanoRange;
use netpod::{NanoRange, Shape};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tokio::fs::File;
@@ -24,11 +24,16 @@ where
type Input = NTY;
type Output = EventValues<NTY>;
fn process(inp: EventValues<Self::Input>) -> Self::Output {
fn create(shape: Shape) -> Self {
Self { _m1: PhantomData }
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
inp
}
}
// TODO rename Scalar -> Dim0
#[derive(Serialize, Deserialize)]
pub struct XBinnedScalarEvents<NTY> {
tss: Vec<u64>,
@@ -169,7 +174,7 @@ where
type Output = MinMaxAvgBins<NTY>;
type Aggregator = XBinnedScalarEventsAggregator<NTY>;
fn aggregator(range: NanoRange) -> Self::Aggregator {
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range)
}
}
@@ -269,6 +274,241 @@ where
}
}
// TODO rename Wave -> Dim1
#[derive(Serialize, Deserialize)]
pub struct XBinnedWaveEvents<NTY> {
tss: Vec<u64>,
mins: Vec<Vec<NTY>>,
maxs: Vec<Vec<NTY>>,
avgs: Vec<Vec<f32>>,
}
impl<NTY> XBinnedWaveEvents<NTY> {
pub fn empty() -> Self {
Self {
tss: vec![],
mins: vec![],
maxs: vec![],
avgs: vec![],
}
}
}
impl<NTY> WithLen for XBinnedWaveEvents<NTY> {
fn len(&self) -> usize {
self.tss.len()
}
}
impl<NTY> WithTimestamps for XBinnedWaveEvents<NTY> {
fn ts(&self, ix: usize) -> u64 {
self.tss[ix]
}
}
impl<NTY> RangeOverlapInfo for XBinnedWaveEvents<NTY> {
fn ends_before(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts < range.beg,
None => true,
}
}
fn ends_after(&self, range: NanoRange) -> bool {
match self.tss.last() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
fn starts_after(&self, range: NanoRange) -> bool {
match self.tss.first() {
Some(&ts) => ts >= range.end,
None => panic!(),
}
}
}
impl<NTY> FitsInside for XBinnedWaveEvents<NTY> {
fn fits_inside(&self, range: NanoRange) -> Fits {
if self.tss.is_empty() {
Fits::Empty
} else {
let t1 = *self.tss.first().unwrap();
let t2 = *self.tss.last().unwrap();
if t2 < range.beg {
Fits::Lower
} else if t1 > range.end {
Fits::Greater
} else if t1 < range.beg && t2 > range.end {
Fits::PartlyLowerAndGreater
} else if t1 < range.beg {
Fits::PartlyLower
} else if t2 > range.end {
Fits::PartlyGreater
} else {
Fits::Inside
}
}
}
}
impl<NTY> FilterFittingInside for XBinnedWaveEvents<NTY> {
fn filter_fitting_inside(self, fit_range: NanoRange) -> Option<Self> {
match self.fits_inside(fit_range) {
Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self),
_ => None,
}
}
}
impl<NTY> PushableIndex for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn push_index(&mut self, src: &Self, ix: usize) {
self.tss.push(src.tss[ix]);
self.mins.push(src.mins[ix]);
self.maxs.push(src.maxs[ix]);
self.avgs.push(src.avgs[ix]);
}
}
impl<NTY> Appendable for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn empty() -> Self {
Self::empty()
}
fn append(&mut self, src: &Self) {
self.tss.extend_from_slice(&src.tss);
self.mins.extend_from_slice(&src.mins);
self.maxs.extend_from_slice(&src.maxs);
self.avgs.extend_from_slice(&src.avgs);
}
}
impl<NTY> ReadableFromFile for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
fn read_from_file(_file: File) -> Result<ReadPbv<Self>, Error> {
// TODO refactor types such that this impl is not needed.
panic!()
}
fn from_buf(_buf: &[u8]) -> Result<Self, Error> {
panic!()
}
}
impl<NTY> TimeBinnableType for XBinnedWaveEvents<NTY>
where
NTY: NumOps,
{
type Output = MinMaxAvgWaveBins<NTY>;
type Aggregator = XBinnedWaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count)
}
}
pub struct XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
range: NanoRange,
count: u64,
min: Vec<NTY>,
max: Vec<NTY>,
sum: Vec<f32>,
sumc: u64,
}
impl<NTY> XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange, bin_count: usize) -> Self {
Self {
range,
count: 0,
min: vec![NTY::min_or_nan(); bin_count],
max: vec![NTY::max_or_nan(); bin_count],
sum: vec![0f32; bin_count],
sumc: 0,
}
}
}
impl<NTY> TimeBinnableTypeAggregator for XBinnedWaveEventsAggregator<NTY>
where
NTY: NumOps,
{
type Input = XBinnedWaveEvents<NTY>;
type Output = MinMaxAvgWaveBins<NTY>;
fn range(&self) -> &NanoRange {
&self.range
}
fn ingest(&mut self, item: &Self::Input) {
for i1 in 0..item.tss.len() {
let ts = item.tss[i1];
if ts < self.range.beg {
continue;
} else if ts >= self.range.end {
continue;
} else {
for (i2, v) in item.mins[i1].iter().enumerate() {
if *v < self.min[i2] || self.min[i2].is_nan() {
self.min[i2] = *v;
}
}
for (i2, v) in item.maxs[i1].iter().enumerate() {
if *v > self.max[i2] || self.max[i2].is_nan() {
self.max[i2] = *v;
}
}
for (i2, v) in item.avgs[i1].iter().enumerate() {
if v.is_nan() {
} else {
self.sum[i2] += v;
}
}
self.sumc += 1;
self.count += 1;
}
}
}
fn result(self) -> Self::Output {
if self.sumc == 0 {
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![None],
maxs: vec![None],
avgs: vec![None],
}
} else {
let avg = self.sum.iter().map(|k| *k / self.sumc as f32).collect();
Self::Output {
ts1s: vec![self.range.beg],
ts2s: vec![self.range.end],
counts: vec![self.count],
mins: vec![Some(self.min)],
maxs: vec![Some(self.max)],
avgs: vec![Some(avg)],
}
}
}
}
#[derive(Serialize, Deserialize)]
pub struct WaveEvents<NTY> {
pub tss: Vec<u64>,
@@ -398,8 +638,8 @@ where
type Output = MinMaxAvgDim1Bins<NTY>;
type Aggregator = WaveEventsAggregator<NTY>;
fn aggregator(range: NanoRange) -> Self::Aggregator {
Self::Aggregator::new(range)
fn aggregator(range: NanoRange, bin_count: usize) -> Self::Aggregator {
Self::Aggregator::new(range, bin_count)
}
}
@@ -419,11 +659,12 @@ impl<NTY> WaveEventsAggregator<NTY>
where
NTY: NumOps,
{
pub fn new(range: NanoRange) -> Self {
pub fn new(range: NanoRange, bin_count: usize) -> Self {
Self {
range,
count: 0,
min: None,
// TODO create the right number of bins right here:
min: err::todoval(),
max: None,
sumc: 0,
sum: None,
@@ -525,9 +766,13 @@ where
type Input = Vec<NTY>;
type Output = XBinnedScalarEvents<NTY>;
fn process(inp: EventValues<Self::Input>) -> Self::Output {
fn create(shape: Shape) -> Self {
Self { _m1: PhantomData }
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
let nev = inp.tss.len();
let mut ret = XBinnedScalarEvents {
let mut ret = Self::Output {
tss: inp.tss,
xbincount: Vec::with_capacity(nev),
mins: Vec::with_capacity(nev),
@@ -535,6 +780,8 @@ where
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
// TODO why do I work here with Option?
err::todo();
let mut min = None;
let mut max = None;
let mut sum = 0f32;
@@ -584,6 +831,7 @@ where
}
pub struct WaveNBinner<NTY> {
bin_count: usize,
_m1: PhantomData<NTY>,
}
@@ -592,11 +840,60 @@ where
NTY: NumOps,
{
type Input = Vec<NTY>;
// TODO need new container type for this case:
type Output = XBinnedScalarEvents<NTY>;
type Output = XBinnedWaveEvents<NTY>;
fn process(_inp: EventValues<Self::Input>) -> Self::Output {
err::todoval()
fn create(shape: Shape) -> Self {
// TODO get rid of panic potential
let bin_count = if let Shape::Wave(n) = shape { n } else { panic!() } as usize;
Self {
bin_count,
_m1: PhantomData,
}
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
let nev = inp.tss.len();
let mut ret = Self::Output {
tss: inp.tss,
mins: Vec::with_capacity(nev),
maxs: Vec::with_capacity(nev),
avgs: Vec::with_capacity(nev),
};
for i1 in 0..nev {
let mut min = vec![NTY::min_or_nan(); self.bin_count];
let mut max = vec![NTY::max_or_nan(); self.bin_count];
let mut sum = vec![0f32; self.bin_count];
let mut sumc = vec![0; self.bin_count];
for (i2, &v) in inp.values[i1].iter().enumerate() {
let i3 = i2 * self.bin_count / inp.values[i1].len();
if v < min[i3] {
min[i3] = v;
}
if v > max[i3] {
max[i3] = v;
}
if v.is_nan() {
} else {
sum[i3] += v.as_();
sumc[i3] += 1;
}
}
ret.mins.push(min);
ret.maxs.push(max);
let avg = sum
.iter()
.enumerate()
.map(|(i3, &k)| {
if sumc[i3] > 0 {
sum[i3] / sumc[i3] as f32
} else {
f32::NAN
}
})
.collect();
ret.avgs.push(avg);
}
ret
}
}
@@ -611,7 +908,11 @@ where
type Input = Vec<NTY>;
type Output = WaveEvents<NTY>;
fn process(inp: EventValues<Self::Input>) -> Self::Output {
fn create(shape: Shape) -> Self {
Self { _m1: PhantomData }
}
fn process(&self, inp: EventValues<Self::Input>) -> Self::Output {
if false {
let n = if inp.values.len() > 0 { inp.values[0].len() } else { 0 };
let n = if n > 5 { 5 } else { n };