WIP not compile

This commit is contained in:
Dominik Werder
2021-05-26 09:08:44 +02:00
parent a76e86e623
commit 2b1be2f2b9
11 changed files with 235 additions and 117 deletions
+14 -5
View File
@@ -6,7 +6,7 @@ use super::eventchunker::EventFull;
use crate::agg::binnedt::AggregatableTdim; use crate::agg::binnedt::AggregatableTdim;
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch; use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::binned::RangeCompletableItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use bytes::BytesMut; use bytes::BytesMut;
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
@@ -25,8 +25,11 @@ pub mod eventbatch;
pub mod scalarbinbatch; pub mod scalarbinbatch;
pub mod streams; pub mod streams;
pub trait AggregatableXdim1Bin { pub trait AggregatableXdim1Bin<SK>
type Output: AggregatableXdim1Bin + AggregatableTdim; where
SK: BinnedStreamKind,
{
type Output: AggregatableXdim1Bin<SK> + AggregatableTdim<SK>;
fn into_agg(self) -> Self::Output; fn into_agg(self) -> Self::Output;
} }
@@ -48,7 +51,10 @@ impl std::fmt::Debug for ValuesDim0 {
} }
} }
impl AggregatableXdim1Bin for ValuesDim1 { impl<SK> AggregatableXdim1Bin<SK> for ValuesDim1
where
SK: BinnedStreamKind,
{
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
@@ -142,7 +148,10 @@ impl std::fmt::Debug for ValuesDim1 {
} }
} }
impl AggregatableXdim1Bin for ValuesDim0 { impl<SK> AggregatableXdim1Bin<SK> for ValuesDim0
where
SK: BinnedStreamKind,
{
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
+40 -20
View File
@@ -1,6 +1,6 @@
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin; use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -10,9 +10,12 @@ use std::collections::VecDeque;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub trait AggregatorTdim: Sized + Unpin { pub trait AggregatorTdim<SK>: Sized + Unpin
where
SK: BinnedStreamKind,
{
type InputValue; type InputValue;
type OutputValue: AggregatableXdim1Bin + AggregatableTdim + Unpin; type OutputValue: AggregatableXdim1Bin<SK> + AggregatableTdim<SK> + Unpin;
fn ends_before(&self, inp: &Self::InputValue) -> bool; fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool; fn ends_after(&self, inp: &Self::InputValue) -> bool;
fn starts_after(&self, inp: &Self::InputValue) -> bool; fn starts_after(&self, inp: &Self::InputValue) -> bool;
@@ -20,34 +23,44 @@ pub trait AggregatorTdim: Sized + Unpin {
fn result(self) -> Vec<Self::OutputValue>; fn result(self) -> Vec<Self::OutputValue>;
} }
pub trait AggregatableTdim: Sized { pub trait AggregatableTdim<SK>: Sized
type Output: AggregatableXdim1Bin + AggregatableTdim; where
type Aggregator: AggregatorTdim<InputValue = Self>; SK: BinnedStreamKind,
{
//type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<SK, InputValue = Self, OutputValue = <SK as BinnedStreamKind>::TBinnedBins>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator; fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
} }
pub trait IntoBinnedT { pub trait IntoBinnedT<SK>
type StreamOut: Stream; where
SK: BinnedStreamKind,
{
type StreamOut: Stream<
Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>,
>;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut; fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
} }
impl<S, I> IntoBinnedT for S impl<S, I, SK> IntoBinnedT<SK> for S
where where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim + Unpin, I: AggregatableTdim<SK> + Unpin,
I::Aggregator: Unpin, I::Aggregator: Unpin,
{ {
type StreamOut = IntoBinnedTDefaultStream<S, I>; type StreamOut = IntoBinnedTDefaultStream<S, I, SK>;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut { fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut {
IntoBinnedTDefaultStream::new(self, spec) IntoBinnedTDefaultStream::new(self, spec)
} }
} }
pub struct IntoBinnedTDefaultStream<S, I> pub struct IntoBinnedTDefaultStream<S, I, SK>
where where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim, I: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
inp: S, inp: S,
aggtor: Option<I::Aggregator>, aggtor: Option<I::Aggregator>,
@@ -60,13 +73,15 @@ where
left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>>, left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>>,
errored: bool, errored: bool,
completed: bool, completed: bool,
tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim>::OutputValue>, tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim<SK>>::OutputValue>,
_marker: std::marker::PhantomData<SK>,
} }
impl<S, I> IntoBinnedTDefaultStream<S, I> impl<S, I, SK> IntoBinnedTDefaultStream<S, I, SK>
where where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim, I: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
pub fn new(inp: S, spec: BinnedRange) -> Self { pub fn new(inp: S, spec: BinnedRange) -> Self {
let range = spec.get_range(0); let range = spec.get_range(0);
@@ -83,6 +98,7 @@ where
errored: false, errored: false,
completed: false, completed: false,
tmp_agg_results: VecDeque::new(), tmp_agg_results: VecDeque::new(),
_marker: std::marker::PhantomData::default(),
} }
} }
@@ -116,7 +132,9 @@ where
&mut self, &mut self,
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>, cur: Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>,
) -> Option< ) -> Option<
Poll<Option<Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>>>, Poll<
Option<Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim<SK>>::OutputValue>>, Error>>,
>,
> { > {
use Poll::*; use Poll::*;
match cur { match cur {
@@ -178,13 +196,15 @@ where
} }
} }
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I> impl<S, I, SK> Stream for IntoBinnedTDefaultStream<S, I, SK>
where where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim + Unpin, I: AggregatableTdim<SK> + Unpin,
I::Aggregator: Unpin, I::Aggregator: Unpin,
SK: BinnedStreamKind,
{ {
type Item = Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>; //type Item = Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>;
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*; use Poll::*;
+76 -8
View File
@@ -213,29 +213,97 @@ where
} }
} }
pub struct MinMaxAvgScalarBinBatchAgg {} pub struct MinMaxAvgScalarBinBatchAgg {
ts1: u64,
ts2: u64,
count: u64,
min: f32,
max: f32,
sum: f32,
sumc: u64,
}
impl MinMaxAvgScalarBinBatchAgg {
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
ts1,
ts2,
count: 0,
min: f32::MAX,
max: f32::MIN,
sum: 0f32,
sumc: 0,
}
}
}
impl AggregatorTdim2 for MinMaxAvgScalarBinBatchAgg { impl AggregatorTdim2 for MinMaxAvgScalarBinBatchAgg {
type InputValue = MinMaxAvgScalarBinBatch; type InputValue = MinMaxAvgScalarBinBatch;
fn ends_before(&self, inp: &Self::InputValue) -> bool { fn ends_before(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts2s.last() {
Some(&ts) => ts <= self.ts1,
None => true,
}
} }
fn ends_after(&self, inp: &Self::InputValue) -> bool { fn ends_after(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts2s.last() {
Some(&ts) => ts >= self.ts2,
_ => panic!(),
}
} }
fn starts_after(&self, inp: &Self::InputValue) -> bool { fn starts_after(&self, inp: &Self::InputValue) -> bool {
todo!() match inp.ts1s.first() {
Some(&ts) => ts >= self.ts2,
_ => panic!(),
}
} }
fn ingest(&mut self, inp: &mut Self::InputValue) { fn ingest(&mut self, v: &mut Self::InputValue) {
todo!() for i1 in 0..v.ts1s.len() {
let ts1 = v.ts1s[i1];
let ts2 = v.ts2s[i1];
if ts2 <= self.ts1 {
continue;
} else if ts1 >= self.ts2 {
continue;
} else {
self.count += v.counts[i1];
self.min = self.min.min(v.mins[i1]);
self.max = self.max.max(v.maxs[i1]);
let x = v.avgs[i1];
if x.is_nan() {
} else {
if self.sum.is_nan() {
self.sum = x;
} else {
self.sum += x;
}
self.sumc += 1;
}
}
}
} }
fn result(self) -> Vec<Self::InputValue> { fn result(self) -> Vec<Self::InputValue> {
todo!() let min = if self.min == f32::MAX { f32::NAN } else { self.min };
let max = if self.max == f32::MIN { f32::NAN } else { self.max };
let avg = if self.sumc == 0 {
f32::NAN
} else {
self.sum / self.sumc as f32
};
let v = MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],
ts2s: vec![self.ts2],
counts: vec![self.count],
mins: vec![min],
maxs: vec![max],
avgs: vec![avg],
};
vec![v]
} }
} }
@@ -243,6 +311,6 @@ impl AggregatableTdim2 for MinMaxAvgScalarBinBatch {
type Aggregator = MinMaxAvgScalarBinBatchAgg; type Aggregator = MinMaxAvgScalarBinBatchAgg;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
todo!() Self::Aggregator::new(ts1, ts2)
} }
} }
+19 -11
View File
@@ -1,45 +1,53 @@
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin; use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem; use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use err::Error; use err::Error;
use futures_core::Stream; use futures_core::Stream;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub trait IntoBinnedXBins1<I> pub trait IntoBinnedXBins1<I, SK>
where where
SK: BinnedStreamKind,
Self: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, Self: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin, I: AggregatableXdim1Bin<SK>,
{ {
type StreamOut; type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut; fn into_binned_x_bins_1(self) -> Self::StreamOut;
} }
impl<S, I> IntoBinnedXBins1<I> for S impl<S, I, SK> IntoBinnedXBins1<I, SK> for S
where where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin, I: AggregatableXdim1Bin<SK>,
{ {
type StreamOut = IntoBinnedXBins1DefaultStream<S, I>; type StreamOut = IntoBinnedXBins1DefaultStream<S, I, SK>;
fn into_binned_x_bins_1(self) -> Self::StreamOut { fn into_binned_x_bins_1(self) -> Self::StreamOut {
IntoBinnedXBins1DefaultStream { inp: self } IntoBinnedXBins1DefaultStream {
inp: self,
_marker: std::marker::PhantomData::default(),
}
} }
} }
pub struct IntoBinnedXBins1DefaultStream<S, I> pub struct IntoBinnedXBins1DefaultStream<S, I, SK>
where where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin, I: AggregatableXdim1Bin<SK>,
{ {
inp: S, inp: S,
_marker: std::marker::PhantomData<SK>,
} }
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I> impl<S, I, SK> Stream for IntoBinnedXBins1DefaultStream<S, I, SK>
where where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin, S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin, I: AggregatableXdim1Bin<SK>,
{ {
type Item = Result<StreamItem<RangeCompletableItem<I::Output>>, Error>; type Item = Result<StreamItem<RangeCompletableItem<I::Output>>, Error>;
+14 -5
View File
@@ -2,7 +2,7 @@ use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem; use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin; use crate::agg::AggregatableXdim1Bin;
use crate::binned::{MakeBytesFrame, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem};
use crate::frame::makeframe::make_frame; use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use err::Error; use err::Error;
@@ -100,15 +100,21 @@ impl std::fmt::Debug for MinMaxAvgScalarEventBatch {
} }
} }
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatch { impl<SK> AggregatableXdim1Bin<SK> for MinMaxAvgScalarEventBatch
where
SK: BinnedStreamKind,
{
type Output = MinMaxAvgScalarEventBatch; type Output = MinMaxAvgScalarEventBatch;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
self self
} }
} }
impl AggregatableTdim for MinMaxAvgScalarEventBatch { impl<SK> AggregatableTdim<SK> for MinMaxAvgScalarEventBatch
type Output = MinMaxAvgScalarBinBatch; where
SK: BinnedStreamKind,
{
//type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarEventBatchAggregator; type Aggregator = MinMaxAvgScalarEventBatchAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
@@ -165,7 +171,10 @@ impl MinMaxAvgScalarEventBatchAggregator {
} }
} }
impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator { impl<SK> AggregatorTdim<SK> for MinMaxAvgScalarEventBatchAggregator
where
SK: BinnedStreamKind,
{
type InputValue = MinMaxAvgScalarEventBatch; type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinBatch;
+14 -5
View File
@@ -1,7 +1,7 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::streams::{Bins, StreamItem}; use crate::agg::streams::{Bins, StreamItem};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside}; use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::{MakeBytesFrame, RangeCompletableItem}; use crate::binned::{BinnedStreamKind, MakeBytesFrame, RangeCompletableItem};
use crate::frame::makeframe::make_frame; use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use err::Error; use err::Error;
@@ -185,15 +185,21 @@ impl MinMaxAvgScalarBinBatch {
} }
} }
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatch { impl<SK> AggregatableXdim1Bin<SK> for MinMaxAvgScalarBinBatch
where
SK: BinnedStreamKind,
{
type Output = MinMaxAvgScalarBinBatch; type Output = MinMaxAvgScalarBinBatch;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
todo!() todo!()
} }
} }
impl AggregatableTdim for MinMaxAvgScalarBinBatch { impl<SK> AggregatableTdim<SK> for MinMaxAvgScalarBinBatch
type Output = MinMaxAvgScalarBinBatch; where
SK: BinnedStreamKind,
{
//type Output = MinMaxAvgScalarBinBatch;
type Aggregator = MinMaxAvgScalarBinBatchAggregator; type Aggregator = MinMaxAvgScalarBinBatchAggregator;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
@@ -231,7 +237,10 @@ impl MinMaxAvgScalarBinBatchAggregator {
} }
} }
impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator { impl<SK> AggregatorTdim<SK> for MinMaxAvgScalarBinBatchAggregator
where
SK: BinnedStreamKind,
{
type InputValue = MinMaxAvgScalarBinBatch; type InputValue = MinMaxAvgScalarBinBatch;
type OutputValue = MinMaxAvgScalarBinBatch; type OutputValue = MinMaxAvgScalarBinBatch;
+22 -16
View File
@@ -1,5 +1,6 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim}; use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::AggregatableXdim1Bin; use crate::agg::AggregatableXdim1Bin;
use crate::binned::BinnedStreamKind;
use crate::streamlog::LogItem; use crate::streamlog::LogItem;
use err::Error; use err::Error;
use netpod::EventDataReadStats; use netpod::EventDataReadStats;
@@ -36,11 +37,12 @@ pub trait ToJsonResult {
fn to_json_result(&self) -> Result<Self::Output, Error>; fn to_json_result(&self) -> Result<Self::Output, Error>;
} }
impl<T> AggregatableXdim1Bin for StreamItem<T> impl<T, SK> AggregatableXdim1Bin<SK> for StreamItem<T>
where where
T: AggregatableTdim + AggregatableXdim1Bin, SK: BinnedStreamKind,
T: AggregatableTdim<SK> + AggregatableXdim1Bin<SK>,
{ {
type Output = StreamItem<<T as AggregatableXdim1Bin>::Output>; type Output = StreamItem<<T as AggregatableXdim1Bin<SK>>::Output>;
fn into_agg(self) -> Self::Output { fn into_agg(self) -> Self::Output {
match self { match self {
@@ -51,30 +53,33 @@ where
} }
} }
pub struct StreamItemAggregator<T> pub struct StreamItemAggregator<T, SK>
where where
T: AggregatableTdim, T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
inner_agg: <T as AggregatableTdim>::Aggregator, inner_agg: <T as AggregatableTdim<SK>>::Aggregator,
} }
impl<T> StreamItemAggregator<T> impl<T, SK> StreamItemAggregator<T, SK>
where where
T: AggregatableTdim, T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
pub fn new(ts1: u64, ts2: u64) -> Self { pub fn new(ts1: u64, ts2: u64) -> Self {
Self { Self {
inner_agg: <T as AggregatableTdim>::aggregator_new_static(ts1, ts2), inner_agg: <T as AggregatableTdim<SK>>::aggregator_new_static(ts1, ts2),
} }
} }
} }
impl<T> AggregatorTdim for StreamItemAggregator<T> impl<T, SK> AggregatorTdim<SK> for StreamItemAggregator<T, SK>
where where
T: AggregatableTdim, T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
type InputValue = StreamItem<T>; type InputValue = StreamItem<T>;
type OutputValue = StreamItem<<<T as AggregatableTdim>::Aggregator as AggregatorTdim>::OutputValue>; type OutputValue = StreamItem<<<T as AggregatableTdim<SK>>::Aggregator as AggregatorTdim<SK>>::OutputValue>;
fn ends_before(&self, inp: &Self::InputValue) -> bool { fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp { match inp {
@@ -119,12 +124,13 @@ where
} }
} }
impl<T> AggregatableTdim for StreamItem<T> impl<T, SK> AggregatableTdim<SK> for StreamItem<T>
where where
T: AggregatableTdim, T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{ {
type Output = StreamItem<<StreamItemAggregator<T> as AggregatorTdim>::OutputValue>; //type Output = StreamItem<<StreamItemAggregator<T> as AggregatorTdim>::OutputValue>;
type Aggregator = StreamItemAggregator<T>; type Aggregator = StreamItemAggregator<T, SK>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator { fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2) Self::Aggregator::new(ts1, ts2)
+20 -12
View File
@@ -1,6 +1,7 @@
use super::agg::IntoDim1F32Stream; use super::agg::IntoDim1F32Stream;
use crate::agg::binnedt::IntoBinnedT; use crate::agg::binnedt::IntoBinnedT;
use crate::agg::binnedx::IntoBinnedXBins1; use crate::agg::binnedx::IntoBinnedXBins1;
use crate::binned::BinnedStreamKindScalar;
use crate::eventblobs::EventBlobsComplete; use crate::eventblobs::EventBlobsComplete;
use crate::eventchunker::EventChunkerConf; use crate::eventchunker::EventChunkerConf;
use futures_util::StreamExt; use futures_util::StreamExt;
@@ -66,8 +67,11 @@ async fn agg_x_dim_0_inner() {
event_chunker_conf, event_chunker_conf,
); );
let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1); let fut1 = IntoDim1F32Stream::into_dim_1_f32_stream(fut1);
let fut1 = IntoBinnedXBins1::into_binned_x_bins_1(fut1); let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1);
let fut1 = IntoBinnedT::into_binned_t(fut1, BinnedRange::covering_range(range, bin_count).unwrap().unwrap()); let fut1 = IntoBinnedT::<BinnedStreamKindScalar>::into_binned_t(
fut1,
BinnedRange::covering_range(range, bin_count).unwrap().unwrap(),
);
let fut1 = fut1 let fut1 = fut1
//.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) //.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap())
.for_each(|_k| ready(())); .for_each(|_k| ready(()));
@@ -128,17 +132,21 @@ async fn agg_x_dim_1_inner() {
} }
} }
q q
}) });
.into_binned_x_bins_1() let fut1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(fut1);
.map(|k| { let fut1 = fut1.map(|k| {
//info!("after X binning {:?}", k.as_ref().unwrap()); //info!("after X binning {:?}", k.as_ref().unwrap());
k k
}) });
.into_binned_t(BinnedRange::covering_range(range, bin_count).unwrap().unwrap()) let fut1 = crate::agg::binnedt::IntoBinnedT::<BinnedStreamKindScalar>::into_binned_t(
.map(|k| { fut1,
info!("after T binning {:?}", k.as_ref().unwrap()); BinnedRange::covering_range(range, bin_count).unwrap().unwrap(),
k );
}) let fut1 = fut1
.for_each(|_k| ready(())); .map(|k| {
info!("after T binning {:?}", k.as_ref().unwrap());
k
})
.for_each(|_k| ready(()));
fut1.await; fut1.await;
} }
+10 -5
View File
@@ -473,7 +473,7 @@ impl Collectable for MinMaxAvgScalarBinBatch {
} }
} }
pub trait XBinnedEvents: pub trait XBinnedEvents<SK>:
Sized Sized
+ Unpin + Unpin
+ Send + Send
@@ -481,10 +481,12 @@ pub trait XBinnedEvents:
+ DeserializeOwned + DeserializeOwned
+ Collectable + Collectable
+ Collected + Collected
+ AggregatableTdim + AggregatableTdim<SK>
+ WithLen + WithLen
+ WithTimestamps + WithTimestamps
+ PushableIndex + PushableIndex
where
SK: BinnedStreamKind,
{ {
fn frame_type() -> u32; fn frame_type() -> u32;
} }
@@ -505,7 +507,10 @@ pub trait TBinnedBins:
fn frame_type() -> u32; fn frame_type() -> u32;
} }
impl XBinnedEvents for MinMaxAvgScalarEventBatch { impl<SK> XBinnedEvents<SK> for MinMaxAvgScalarEventBatch
where
SK: BinnedStreamKind,
{
fn frame_type() -> u32 { fn frame_type() -> u32 {
<Result<StreamItem<RangeCompletableItem<Self>>, Error> as FrameType>::FRAME_TYPE_ID <Result<StreamItem<RangeCompletableItem<Self>>, Error> as FrameType>::FRAME_TYPE_ID
} }
@@ -524,7 +529,7 @@ pub trait BinnedStreamKind: Clone + Unpin + Send + Sync + 'static
type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>> type TBinnedStreamType: Stream<Item = Result<StreamItem<RangeCompletableItem<Self::TBinnedBins>>, Error>>
+ Send + Send
+ 'static; + 'static;
type XBinnedEvents: XBinnedEvents; type XBinnedEvents: XBinnedEvents<Self>;
type TBinnedBins: TBinnedBins; type TBinnedBins: TBinnedBins;
fn new_binned_from_prebinned( fn new_binned_from_prebinned(
@@ -603,7 +608,7 @@ impl BinnedStreamKind for BinnedStreamKindScalar {
) -> Result<Self::TBinnedStreamType, Error> { ) -> Result<Self::TBinnedStreamType, Error> {
let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone()); let s = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone(), self.clone());
// TODO use the binned2 instead // TODO use the binned2 instead
let s = crate::agg::binnedt::IntoBinnedT::into_binned_t(s, range); let s = crate::agg::binnedt::IntoBinnedT::<Self>::into_binned_t(s, range);
Ok(BoxedStream::new(Box::pin(s))?) Ok(BoxedStream::new(Box::pin(s))?)
} }
} }
+2 -27
View File
@@ -1,4 +1,3 @@
use crate::agg::binnedt::IntoBinnedT;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::{Collectable, Collected, StreamItem}; use crate::agg::streams::{Collectable, Collected, StreamItem};
use crate::binned::RangeCompletableItem::RangeComplete; use crate::binned::RangeCompletableItem::RangeComplete;
@@ -168,33 +167,9 @@ where
self.node_config.node_config.cluster.clone(), self.node_config.node_config.cluster.clone(),
self.stream_kind.clone(), self.stream_kind.clone(),
); );
let s1 = IntoBinnedT::into_binned_t(s1, range); let s1 = crate::agg::binnedt::IntoBinnedT::<SK>::into_binned_t(s1, range);
let s1 = s1.map(|item| {
// TODO does this do anything?
match item {
Ok(item) => match item {
StreamItem::Log(item) => Ok(StreamItem::Log(item)),
StreamItem::Stats(item) => Ok(StreamItem::Stats(item)),
StreamItem::DataItem(item) => Ok(StreamItem::DataItem(item)),
/*StreamItem::DataItem(item) => match item {
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {
Ok(StreamItem::DataItem(PreBinnedScalarItem::RangeComplete))
}
MinMaxAvgScalarBinBatchStreamItem::Values(item) => {
Ok(StreamItem::DataItem(PreBinnedScalarItem::Batch(item)))
}
},*/
},
Err(e) => Err(e),
}
});
// TODO
// In the above must introduce a trait to convert to the generic item type:
// TODO!!
self.fut2 = Some(err::todoval());
//self.fut2 = Some(Box::pin(s1)); //self.fut2 = Some(Box::pin(s1));
self.fut2 = err::todoval();
} }
fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) { fn setup_from_higher_res_prebinned(&mut self, range: PreBinnedPatchRange) {
+4 -3
View File
@@ -154,7 +154,7 @@ async fn events_conn_handler_inner_try(
// TODO use a requested buffer size // TODO use a requested buffer size
let buffer_size = 1024 * 4; let buffer_size = 1024 * 4;
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
let mut s1 = EventBlobsComplete::new( let s1 = EventBlobsComplete::new(
range.clone(), range.clone(),
channel_config.clone(), channel_config.clone(),
node_config.node.clone(), node_config.node.clone(),
@@ -162,8 +162,9 @@ async fn events_conn_handler_inner_try(
buffer_size, buffer_size,
event_chunker_conf, event_chunker_conf,
) )
.into_dim_1_f32_stream() .into_dim_1_f32_stream();
.into_binned_x_bins_1(); // TODO need to decide already here on the type I want to use.
let mut s1 = IntoBinnedXBins1::<_, BinnedStreamKindScalar>::into_binned_x_bins_1(s1);
let mut e = 0; let mut e = 0;
while let Some(item) = s1.next().await { while let Some(item) = s1.next().await {
match &item { match &item {