This commit is contained in:
Dominik Werder
2021-05-25 23:41:22 +02:00
parent 83adaa7321
commit 9751660118
17 changed files with 438 additions and 349 deletions

View File

@@ -1,5 +1,6 @@
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -32,7 +33,7 @@ pub trait IntoBinnedT {
impl<S, I> IntoBinnedT for S
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim + Unpin,
I::Aggregator: Unpin,
{
@@ -45,7 +46,7 @@ where
pub struct IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>>,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim,
{
inp: S,
@@ -56,7 +57,7 @@ where
all_bins_emitted: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
left: Option<Poll<Option<Result<StreamItem<I>, Error>>>>,
left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>>,
errored: bool,
completed: bool,
tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim>::OutputValue>,
@@ -64,7 +65,7 @@ where
impl<S, I> IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim,
{
pub fn new(inp: S, spec: BinnedRange) -> Self {
@@ -85,7 +86,7 @@ where
}
}
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<I>, Error>>> {
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>> {
if let Some(cur) = self.left.take() {
cur
} else if self.inp_completed {
@@ -113,43 +114,50 @@ where
fn handle(
&mut self,
cur: Poll<Option<Result<StreamItem<I>, Error>>>,
) -> Option<Poll<Option<Result<StreamItem<<I::Aggregator as AggregatorTdim>::OutputValue>, Error>>>> {
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>,
) -> Option<
Poll<Option<Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>>>,
> {
use Poll::*;
match cur {
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))),
StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))),
StreamItem::DataItem(item) => {
if item.is_range_complete() {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
self.range_complete_observed = true;
None
} else if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
None
} else if ag.starts_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
}
RangeCompletableItem::Data(item) => {
if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let mut item = item;
ag.ingest(&mut item);
let item = item;
if ag.ends_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
None
} else if ag.starts_after(&item) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
} else {
let mut item = item;
ag.ingest(&mut item);
let item = item;
if ag.ends_after(&item) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
}
}
},
},
Ready(Some(Err(e))) => {
self.errored = true;
@@ -172,11 +180,11 @@ where
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<StreamItem<<I::Aggregator as AggregatorTdim>::OutputValue>, Error>;
type Item = Result<StreamItem<RangeCompletableItem<<I::Aggregator as AggregatorTdim>::OutputValue>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -187,19 +195,14 @@ where
self.completed = true;
Ready(None)
} else if let Some(item) = self.tmp_agg_results.pop_front() {
Ready(Some(Ok(StreamItem::DataItem(item))))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.inp_completed && self.all_bins_emitted {
self.range_complete_emitted = true;
if self.range_complete_observed {
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
Ready(Some(Ok(StreamItem::DataItem(item))))
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one");
continue 'outer;
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue 'outer;
}

View File

@@ -1,5 +1,7 @@
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -9,7 +11,7 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait AggregatorTdim2: Sized + Unpin {
pub trait AggregatorTdim2: Sized + Send + Unpin {
type InputValue;
fn ends_before(&self, inp: &Self::InputValue) -> bool;
fn ends_after(&self, inp: &Self::InputValue) -> bool;
@@ -21,8 +23,6 @@ pub trait AggregatorTdim2: Sized + Unpin {
pub trait AggregatableTdim2: Sized {
type Aggregator: AggregatorTdim2<InputValue = Self>;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
fn is_range_complete(&self) -> bool;
fn make_range_complete_item() -> Option<Self>;
}
pub trait IntoBinnedT {
@@ -32,7 +32,7 @@ pub trait IntoBinnedT {
impl<S, I> IntoBinnedT for S
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim2 + Unpin,
I::Aggregator: Unpin,
{
@@ -45,7 +45,7 @@ where
pub struct IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>>,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>>,
I: AggregatableTdim2,
{
inp: S,
@@ -56,7 +56,7 @@ where
all_bins_emitted: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
left: Option<Poll<Option<Result<StreamItem<I>, Error>>>>,
left: Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>>,
errored: bool,
completed: bool,
tmp_agg_results: VecDeque<I>,
@@ -64,7 +64,7 @@ where
impl<S, I> IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim2,
{
pub fn new(inp: S, spec: BinnedRange) -> Self {
@@ -85,7 +85,7 @@ where
}
}
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<I>, Error>>> {
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>> {
if let Some(cur) = self.left.take() {
cur
} else if self.inp_completed {
@@ -113,43 +113,47 @@ where
fn handle(
&mut self,
cur: Poll<Option<Result<StreamItem<I>, Error>>>,
) -> Option<Poll<Option<Result<StreamItem<I>, Error>>>> {
cur: Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>,
) -> Option<Poll<Option<Result<StreamItem<RangeCompletableItem<I>>, Error>>>> {
use Poll::*;
match cur {
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))),
StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))),
StreamItem::DataItem(item) => {
if item.is_range_complete() {
self.range_complete_observed = true;
None
} else if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
None
} else if ag.starts_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => Some(Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::RangeComplete,
))))),
RangeCompletableItem::Data(item) => {
if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let mut item = item;
ag.ingest(&mut item);
let item = item;
if ag.ends_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
None
} else if ag.starts_after(&item) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
} else {
let mut item = item;
ag.ingest(&mut item);
let item = item;
if ag.ends_after(&item) {
self.left =
Some(Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))))));
self.cycle_current_bin();
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
}
}
},
},
Ready(Some(Err(e))) => {
self.errored = true;
@@ -172,11 +176,11 @@ where
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableTdim2 + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<StreamItem<I>, Error>;
type Item = Result<StreamItem<RangeCompletableItem<I>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -187,19 +191,14 @@ where
self.completed = true;
Ready(None)
} else if let Some(item) = self.tmp_agg_results.pop_front() {
Ready(Some(Ok(StreamItem::DataItem(item))))
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))))
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.inp_completed && self.all_bins_emitted {
self.range_complete_emitted = true;
if self.range_complete_observed {
if let Some(item) = I::make_range_complete_item() {
Ready(Some(Ok(StreamItem::DataItem(item))))
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one");
continue 'outer;
}
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
continue 'outer;
}
@@ -213,3 +212,37 @@ where
}
}
}
pub struct MinMaxAvgScalarBinBatchAgg {}
impl AggregatorTdim2 for MinMaxAvgScalarBinBatchAgg {
type InputValue = MinMaxAvgScalarBinBatch;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
todo!()
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
todo!()
}
fn result(self) -> Vec<Self::InputValue> {
todo!()
}
}
impl AggregatableTdim2 for MinMaxAvgScalarBinBatch {
type Aggregator = MinMaxAvgScalarBinBatchAgg;
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator {
todo!()
}
}

View File

@@ -1,5 +1,6 @@
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::RangeCompletableItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
@@ -8,20 +9,19 @@ use std::task::{Context, Poll};
pub trait IntoBinnedXBins1<I>
where
Self: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut
where
Self: Stream<Item = Result<StreamItem<I>, Error>>;
fn into_binned_x_bins_1(self) -> Self::StreamOut;
}
impl<T, I> IntoBinnedXBins1<I> for T
impl<S, I> IntoBinnedXBins1<I> for S
where
T: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
type StreamOut = IntoBinnedXBins1DefaultStream<S, I>;
fn into_binned_x_bins_1(self) -> Self::StreamOut {
IntoBinnedXBins1DefaultStream { inp: self }
@@ -30,7 +30,7 @@ where
pub struct IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
inp: S,
@@ -38,10 +38,10 @@ where
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<RangeCompletableItem<I>>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type Item = Result<StreamItem<I::Output>, Error>;
type Item = Result<StreamItem<RangeCompletableItem<I::Output>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -49,7 +49,14 @@ where
Ready(Some(Ok(k))) => match k {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => Ready(Some(Ok(StreamItem::DataItem(item.into_agg())))),
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
}
RangeCompletableItem::Data(item) => Ready(Some(Ok(StreamItem::DataItem(
RangeCompletableItem::Data(item.into_agg()),
)))),
},
},
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),

View File

@@ -1,7 +1,11 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch;
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::binned::{MakeBytesFrame, RangeCompletableItem};
use crate::frame::makeframe::make_frame;
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use netpod::log::*;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
@@ -230,3 +234,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
vec![v]
}
}
impl MakeBytesFrame for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarEventBatch>>, Error> {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
Ok(make_frame(self)?.freeze())
}
}

View File

@@ -1,5 +1,5 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::streams::Bins;
use crate::agg::streams::{Bins, StreamItem};
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::{MakeBytesFrame, RangeCompletableItem};
use crate::frame::makeframe::make_frame;
@@ -302,7 +302,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
}
}
impl MakeBytesFrame for Result<RangeCompletableItem<MinMaxAvgScalarBinBatch>, Error> {
impl MakeBytesFrame for Result<StreamItem<RangeCompletableItem<MinMaxAvgScalarBinBatch>>, Error> {
fn make_bytes_frame(&self) -> Result<Bytes, Error> {
Ok(make_frame(self)?.freeze())
}

View File

@@ -28,7 +28,7 @@ pub trait Collected {
pub trait Collectable {
type Collected: Collected;
fn append_to(&mut self, collected: &mut Self::Collected);
fn append_to(&self, collected: &mut Self::Collected);
}
pub trait ToJsonResult {