Transition to more specific stage 1 binning, compiles

This commit is contained in:
Dominik Werder
2021-05-26 14:43:06 +02:00
parent 2b1be2f2b9
commit 0ab17d35da
9 changed files with 179 additions and 149 deletions

View File

@@ -15,7 +15,7 @@ where
SK: BinnedStreamKind,
{
type InputValue;
type OutputValue: AggregatableXdim1Bin<SK> + AggregatableTdim<SK> + Unpin;
type OutputValue;
fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool;
fn starts_after(&self, inp: &Self::InputValue) -> bool;
@@ -27,67 +27,63 @@ pub trait AggregatableTdim<SK>: Sized
where
SK: BinnedStreamKind,
{
//type Output: AggregatableXdim1Bin + AggregatableTdim;
type Aggregator: AggregatorTdim<SK, InputValue = Self, OutputValue = <SK as BinnedStreamKind>::TBinnedBins>;
type Aggregator: AggregatorTdim<SK>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
}
pub trait IntoBinnedT<SK>
pub trait IntoBinnedT<SK, S>
where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>> + Unpin,
{
type StreamOut: Stream<
Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>,
>;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut;
fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream<SK, S>;
}
impl<S, I, SK> IntoBinnedT<SK> for S
impl<SK, S> IntoBinnedT<SK, S> for S
where
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim<SK> + Unpin,
I::Aggregator: Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>> + Unpin,
{
type StreamOut = IntoBinnedTDefaultStream<S, I, SK>;
fn into_binned_t(self, spec: BinnedRange) -> Self::StreamOut {
fn into_binned_t(self, spec: BinnedRange) -> IntoBinnedTDefaultStream<SK, S> {
IntoBinnedTDefaultStream::new(self, spec)
}
}
pub struct IntoBinnedTDefaultStream<S, I, SK>
pub struct IntoBinnedTDefaultStream<SK, S>
where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim<SK>,
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>> + Unpin,
{
inp: S,
aggtor: Option<I::Aggregator>,
aggtor: Option<<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::Aggregator>,
spec: BinnedRange,
curbin: u32,
inp_completed: bool,
all_bins_emitted: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>>,
left:
Option<Poll<Option<Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>>>>,
errored: bool,
completed: bool,
tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim<SK>>::OutputValue>,
tmp_agg_results: VecDeque<<SK as BinnedStreamKind>::TBinnedBins>,
_marker: std::marker::PhantomData<SK>,
}
impl<S, I, SK> IntoBinnedTDefaultStream<S, I, SK>
impl<SK, S> IntoBinnedTDefaultStream<SK, S>
where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim<SK>,
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>> + Unpin,
{
pub fn new(inp: S, spec: BinnedRange) -> Self {
let range = spec.get_range(0);
Self {
inp,
aggtor: Some(I::aggregator_new_static(range.beg, range.end)),
aggtor: Some(
<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::aggregator_new_static(
range.beg, range.end,
),
),
spec,
curbin: 0,
inp_completed: false,
@@ -102,7 +98,10 @@ where
}
}
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>> {
fn cur(
&mut self,
cx: &mut Context,
) -> Poll<Option<Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>>> {
if let Some(cur) = self.left.take() {
cur
} else if self.inp_completed {
@@ -118,11 +117,16 @@ where
let range = self.spec.get_range(self.curbin);
let ret = self
.aggtor
.replace(I::aggregator_new_static(range.beg, range.end))
.replace(
<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::aggregator_new_static(
range.beg, range.end,
),
)
// TODO handle None case, or remove Option if Agg is always present
.unwrap()
.result();
self.tmp_agg_results = ret.into();
//self.tmp_agg_results = VecDeque::from(ret);
self.tmp_agg_results = VecDeque::new();
if self.curbin >= self.spec.count as u32 {
self.all_bins_emitted = true;
}
@@ -130,12 +134,9 @@ where
fn handle(
&mut self,
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>,
) -> Option<
Poll<
Option<Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim<SK>>::OutputValue>>, Error>>,
>,
> {
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>>>,
) -> Option<Poll<Option<Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>>>>
{
use Poll::*;
match cur {
Ready(Some(Ok(item))) => match item {
@@ -153,9 +154,11 @@ where
None
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
//if ag.ends_before(&item) {
if ag.ends_before(err::todoval()) {
None
} else if ag.starts_after(&item) {
//} else if ag.starts_after(&item) {
} else if ag.starts_after(err::todoval()) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
@@ -163,9 +166,11 @@ where
None
} else {
let mut item = item;
ag.ingest(&mut item);
//ag.ingest(&mut item);
ag.ingest(err::todoval());
let item = item;
if ag.ends_after(&item) {
//if ag.ends_after(&item) {
if ag.ends_after(err::todoval()) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
@@ -196,14 +201,11 @@ where
}
}
impl<S, I, SK> Stream for IntoBinnedTDefaultStream<S, I, SK>
impl<SK, S> Stream for IntoBinnedTDefaultStream<SK, S>
where
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim<SK> + Unpin,
I::Aggregator: Unpin,
SK: BinnedStreamKind,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>> + Unpin,
{
//type Item = Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>;
type Item = Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::TBinnedBins>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {

101
disk/src/agg/binnedt3.rs Normal file
View File

@@ -0,0 +1,101 @@
use crate::agg::eventbatch::MinMaxAvgScalarEventBatch;
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::binned::{BinnedStreamKind, RangeCompletableItem};
use err::Error;
use futures_core::Stream;
use netpod::BinnedRange;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Aggregator3Tdim {
type InputValue;
type OutputValue;
}
pub struct Agg3 {}
impl Aggregator3Tdim for Agg3 {
type InputValue = MinMaxAvgScalarEventBatch;
type OutputValue = MinMaxAvgScalarBinBatch;
}
pub struct BinnedT3Stream {
// TODO get rid of box:
inp: Pin<Box<dyn Stream<Item = MinMaxAvgScalarEventBatch> + Send>>,
//aggtor: Option<<<SK as BinnedStreamKind>::XBinnedEvents as AggregatableTdim<SK>>::Aggregator>,
aggtor: Option<()>,
spec: BinnedRange,
curbin: u32,
inp_completed: bool,
all_bins_emitted: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
//left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<<SK as BinnedStreamKind>::XBinnedEvents>>, Error>>>>,
left: Option<()>,
errored: bool,
completed: bool,
tmp_agg_results: VecDeque<MinMaxAvgScalarBinBatch>,
}
impl BinnedT3Stream {
pub fn new<S>(inp: S, spec: BinnedRange) -> Self
where
S: Stream<Item = MinMaxAvgScalarEventBatch> + Send + 'static,
{
// TODO simplify here, get rid of numeric parameter:
let range = spec.get_range(0);
Self {
inp: Box::pin(inp),
aggtor: None,
spec,
curbin: 0,
inp_completed: false,
all_bins_emitted: false,
range_complete_observed: false,
range_complete_emitted: false,
left: None,
errored: false,
completed: false,
tmp_agg_results: VecDeque::new(),
}
}
}
impl Stream for BinnedT3Stream {
type Item = Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("IntoBinnedTDefaultStream poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if let Some(item) = self.tmp_agg_results.pop_front() {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.inp_completed && self.all_bins_emitted {
self.range_complete_emitted = true;
if self.range_complete_observed {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue 'outer;
}
} else {
err::todo();
Pending
// TODO `cur` and `handle` are not yet taken over from binnedt.rs
/*let cur = self.cur(cx);
match self.handle(cur) {
Some(item) => item,
None => continue 'outer,
}*/
};
}
}
}

View File

@@ -232,6 +232,12 @@ where
} else {
self.sum / self.sumc as f32
};
// TODO impl problem:
// The return type of this function must be the concrete type that I implement for.
// Otherwise I have no chance building that values.
// I must somehow differently couple that to the SK.
let v = MinMaxAvgScalarBinBatch {
ts1s: vec![self.ts1],
ts2s: vec![self.ts2],

View File

@@ -36,103 +36,3 @@ pub trait ToJsonResult {
type Output;
fn to_json_result(&self) -> Result<Self::Output, Error>;
}
impl<T, SK> AggregatableXdim1Bin<SK> for StreamItem<T>
where
SK: BinnedStreamKind,
T: AggregatableTdim<SK> + AggregatableXdim1Bin<SK>,
{
type Output = StreamItem<<T as AggregatableXdim1Bin<SK>>::Output>;
fn into_agg(self) -> Self::Output {
match self {
Self::Log(item) => Self::Output::Log(item),
Self::Stats(item) => Self::Output::Stats(item),
Self::DataItem(item) => Self::Output::DataItem(item.into_agg()),
}
}
}
pub struct StreamItemAggregator<T, SK>
where
T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{
inner_agg: <T as AggregatableTdim<SK>>::Aggregator,
}
impl<T, SK> StreamItemAggregator<T, SK>
where
T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
inner_agg: <T as AggregatableTdim<SK>>::aggregator_new_static(ts1, ts2),
}
}
}
impl<T, SK> AggregatorTdim<SK> for StreamItemAggregator<T, SK>
where
T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{
type InputValue = StreamItem<T>;
type OutputValue = StreamItem<<<T as AggregatableTdim<SK>>::Aggregator as AggregatorTdim<SK>>::OutputValue>;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.ends_before(item),
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.ends_after(item),
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.starts_after(item),
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
StreamItem::DataItem(item) => {
self.inner_agg.ingest(item);
}
}
}
fn result(self) -> Vec<Self::OutputValue> {
self.inner_agg
.result()
.into_iter()
.map(|k| StreamItem::DataItem(k))
.collect()
}
}
impl<T, SK> AggregatableTdim<SK> for StreamItem<T>
where
T: AggregatableTdim<SK>,
SK: BinnedStreamKind,
{
//type Output = StreamItem<<StreamItemAggregator<T> as AggregatorTdim>::OutputValue>;
type Aggregator = StreamItemAggregator<T, SK>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
Self::Aggregator::new(ts1, ts2)
}
}