It compiles

This commit is contained in:
Dominik Werder
2021-05-20 20:04:21 +02:00
parent 49af7ce561
commit 30be7d1c44
25 changed files with 424 additions and 663 deletions

View File

@@ -1,10 +1,10 @@
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use crate::streamlog::LogItem;
use err::Error;
use futures_core::Stream;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::{BinnedRange, EventDataReadStats};
use netpod::BinnedRange;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
@@ -25,12 +25,6 @@ pub trait AggregatableTdim: Sized {
fn aggregator_new_static(ts1: u64, ts2: u64) -> Self::Aggregator;
fn is_range_complete(&self) -> bool;
fn make_range_complete_item() -> Option<Self>;
fn is_log_item(&self) -> bool;
fn log_item(self) -> Option<LogItem>;
fn make_log_item(item: LogItem) -> Option<Self>;
fn is_stats_item(&self) -> bool;
fn stats_item(self) -> Option<EventDataReadStats>;
fn make_stats_item(item: EventDataReadStats) -> Option<Self>;
}
pub trait IntoBinnedT {
@@ -40,9 +34,8 @@ pub trait IntoBinnedT {
impl<S, I> IntoBinnedT for S
where
S: Stream<Item = Result<I, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableTdim + Unpin,
//I: AggregatableTdim,
I::Aggregator: Unpin,
{
type StreamOut = IntoBinnedTDefaultStream<S, I>;
@@ -54,7 +47,7 @@ where
pub struct IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>>,
S: Stream<Item = Result<StreamItem<I>, Error>>,
I: AggregatableTdim,
{
inp: S,
@@ -65,7 +58,7 @@ where
all_bins_emitted: bool,
range_complete_observed: bool,
range_complete_emitted: bool,
left: Option<Poll<Option<Result<I, Error>>>>,
left: Option<Poll<Option<Result<StreamItem<I>, Error>>>>,
errored: bool,
completed: bool,
tmp_agg_results: VecDeque<<I::Aggregator as AggregatorTdim>::OutputValue>,
@@ -73,7 +66,7 @@ where
impl<S, I> IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableTdim,
{
pub fn new(inp: S, spec: BinnedRange) -> Self {
@@ -94,7 +87,7 @@ where
}
}
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<I, Error>>> {
fn cur(&mut self, cx: &mut Context) -> Poll<Option<Result<StreamItem<I>, Error>>> {
if let Some(cur) = self.left.take() {
cur
} else if self.inp_completed {
@@ -122,64 +115,44 @@ where
fn handle(
&mut self,
cur: Poll<Option<Result<I, Error>>>,
) -> Option<Poll<Option<Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>>>> {
cur: Poll<Option<Result<StreamItem<I>, Error>>>,
) -> Option<Poll<Option<Result<StreamItem<<I::Aggregator as AggregatorTdim>::OutputValue>, Error>>>> {
use Poll::*;
match cur {
Ready(Some(Ok(k))) => {
if k.is_range_complete() {
self.range_complete_observed = true;
None
} else if k.is_log_item() {
if let Some(item) = k.log_item() {
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_log_item(item) {
Some(Ready(Some(Ok(item))))
} else {
error!("IntoBinnedTDefaultStream can not create log item");
Ready(Some(Ok(item))) => match item {
StreamItem::Log(item) => Some(Ready(Some(Ok(StreamItem::Log(item))))),
StreamItem::Stats(item) => Some(Ready(Some(Ok(StreamItem::Stats(item))))),
StreamItem::DataItem(item) => {
if item.is_range_complete() {
self.range_complete_observed = true;
None
} else if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&item) {
None
}
} else {
error!("supposed to be log item but can't take it");
None
}
} else if k.is_stats_item() {
if let Some(item) = k.stats_item() {
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_stats_item(item) {
Some(Ready(Some(Ok(item))))
} else {
error!("IntoBinnedTDefaultStream can not create stats item");
None
}
} else {
error!("supposed to be stats item but can't take it");
None
}
} else if self.all_bins_emitted {
// Just drop the item because we will not emit anymore data.
// Could also at least gather some stats.
None
} else {
let ag = self.aggtor.as_mut().unwrap();
if ag.ends_before(&k) {
None
} else if ag.starts_after(&k) {
self.left = Some(Ready(Some(Ok(k))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
} else {
let mut k = k;
ag.ingest(&mut k);
let k = k;
if ag.ends_after(&k) {
self.left = Some(Ready(Some(Ok(k))));
} else if ag.starts_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
self.cycle_current_bin();
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
} else {
let mut item = item;
ag.ingest(&mut item);
let item = item;
if ag.ends_after(&item) {
self.left = Some(Ready(Some(Ok(StreamItem::DataItem(item)))));
self.cycle_current_bin();
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
// TODO cycle_current_bin enqueues the bin, can I return here instead?
None
}
}
}
},
Ready(Some(Err(e))) => {
self.errored = true;
Some(Ready(Some(Err(e))))
@@ -201,12 +174,11 @@ where
impl<S, I> Stream for IntoBinnedTDefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
//I: AggregatableTdim,
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableTdim + Unpin,
I::Aggregator: Unpin,
{
type Item = Result<<I::Aggregator as AggregatorTdim>::OutputValue, Error>;
type Item = Result<StreamItem<<I::Aggregator as AggregatorTdim>::OutputValue>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
@@ -217,17 +189,15 @@ where
self.completed = true;
Ready(None)
} else if let Some(item) = self.tmp_agg_results.pop_front() {
Ready(Some(Ok(item)))
Ready(Some(Ok(StreamItem::DataItem(item))))
} else if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else if self.inp_completed && self.all_bins_emitted {
self.range_complete_emitted = true;
if self.range_complete_observed {
// TODO why can't I declare that type alias?
//type TT = I;
if let Some(item) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
Ready(Some(Ok(item)))
Ready(Some(Ok(StreamItem::DataItem(item))))
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one");
continue 'outer;

View File

@@ -1,3 +1,4 @@
use crate::agg::streams::StreamItem;
use crate::agg::AggregatableXdim1Bin;
use err::Error;
use futures_core::Stream;
@@ -5,16 +6,20 @@ use futures_util::StreamExt;
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait IntoBinnedXBins1<I: AggregatableXdim1Bin> {
pub trait IntoBinnedXBins1<I>
where
I: AggregatableXdim1Bin,
{
type StreamOut;
fn into_binned_x_bins_1(self) -> Self::StreamOut
where
Self: Stream<Item = Result<I, Error>>;
Self: Stream<Item = Result<StreamItem<I>, Error>>;
}
impl<T, I: AggregatableXdim1Bin> IntoBinnedXBins1<I> for T
impl<T, I> IntoBinnedXBins1<I> for T
where
T: Stream<Item = Result<I, Error>> + Unpin,
T: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type StreamOut = IntoBinnedXBins1DefaultStream<T, I>;
@@ -25,7 +30,7 @@ where
pub struct IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
inp: S,
@@ -33,15 +38,19 @@ where
impl<S, I> Stream for IntoBinnedXBins1DefaultStream<S, I>
where
S: Stream<Item = Result<I, Error>> + Unpin,
S: Stream<Item = Result<StreamItem<I>, Error>> + Unpin,
I: AggregatableXdim1Bin,
{
type Item = Result<I::Output, Error>;
type Item = Result<StreamItem<I::Output>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(k))) => Ready(Some(Ok(k.into_agg()))),
Ready(Some(Ok(k))) => match k {
StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))),
StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))),
StreamItem::DataItem(item) => Ready(Some(Ok(StreamItem::DataItem(item.into_agg())))),
},
Ready(Some(Err(e))) => Ready(Some(Err(e))),
Ready(None) => Ready(None),
Pending => Pending,

View File

@@ -1,10 +1,8 @@
use crate::agg::binnedt::{AggregatableTdim, AggregatorTdim};
use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem};
use crate::agg::AggregatableXdim1Bin;
use crate::streamlog::LogItem;
use bytes::{BufMut, Bytes, BytesMut};
use netpod::log::*;
use netpod::EventDataReadStats;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
@@ -120,30 +118,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch {
fn make_range_complete_item() -> Option<Self> {
None
}
fn is_log_item(&self) -> bool {
false
}
fn log_item(self) -> Option<LogItem> {
None
}
fn make_log_item(_item: LogItem) -> Option<Self> {
None
}
fn is_stats_item(&self) -> bool {
false
}
fn stats_item(self) -> Option<EventDataReadStats> {
None
}
fn make_stats_item(_item: EventDataReadStats) -> Option<Self> {
None
}
}
impl MinMaxAvgScalarEventBatch {
@@ -269,8 +243,6 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchAggregator {
pub enum MinMaxAvgScalarEventBatchStreamItem {
Values(MinMaxAvgScalarEventBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
Log(LogItem),
}
impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
@@ -301,46 +273,6 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem {
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)
}
fn is_log_item(&self) -> bool {
if let MinMaxAvgScalarEventBatchStreamItem::Log(_) = self {
true
} else {
false
}
}
fn log_item(self) -> Option<LogItem> {
if let MinMaxAvgScalarEventBatchStreamItem::Log(item) = self {
Some(item)
} else {
None
}
}
fn make_log_item(item: LogItem) -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::Log(item))
}
fn is_stats_item(&self) -> bool {
if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) = self {
true
} else {
false
}
}
fn stats_item(self) -> Option<EventDataReadStats> {
if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item) = self {
Some(item)
} else {
None
}
}
fn make_stats_item(item: EventDataReadStats) -> Option<Self> {
Some(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item))
}
}
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
@@ -382,9 +314,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) => panic!(),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(),
MinMaxAvgScalarEventBatchStreamItem::Log(_) => panic!(),
}
}

View File

@@ -3,12 +3,11 @@ use crate::agg::streams::Bins;
use crate::agg::{AggregatableXdim1Bin, Fits, FitsInside};
use crate::binned::MakeBytesFrame;
use crate::frame::makeframe::make_frame;
use crate::streamlog::LogItem;
use bytes::{BufMut, Bytes, BytesMut};
use err::Error;
use netpod::log::*;
use netpod::timeunits::SEC;
use netpod::{EventDataReadStats, NanoRange};
use netpod::NanoRange;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
@@ -208,30 +207,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch {
fn make_range_complete_item() -> Option<Self> {
None
}
fn is_log_item(&self) -> bool {
false
}
fn log_item(self) -> Option<LogItem> {
None
}
fn make_log_item(_item: LogItem) -> Option<Self> {
None
}
fn is_stats_item(&self) -> bool {
false
}
fn stats_item(self) -> Option<EventDataReadStats> {
None
}
fn make_stats_item(_item: EventDataReadStats) -> Option<Self> {
None
}
}
impl Bins for MinMaxAvgScalarBinBatch {
@@ -339,8 +314,6 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchAggregator {
pub enum MinMaxAvgScalarBinBatchStreamItem {
Values(MinMaxAvgScalarBinBatch),
RangeComplete,
EventDataReadStats(EventDataReadStats),
Log(LogItem),
}
impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
@@ -362,46 +335,6 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem {
fn make_range_complete_item() -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)
}
fn is_log_item(&self) -> bool {
if let MinMaxAvgScalarBinBatchStreamItem::Log(_) = self {
true
} else {
false
}
}
fn log_item(self) -> Option<LogItem> {
if let MinMaxAvgScalarBinBatchStreamItem::Log(item) = self {
Some(item)
} else {
None
}
}
fn make_log_item(item: LogItem) -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::Log(item))
}
fn is_stats_item(&self) -> bool {
if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) = self {
true
} else {
false
}
}
fn stats_item(self) -> Option<EventDataReadStats> {
if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) = self {
Some(item)
} else {
None
}
}
fn make_stats_item(item: EventDataReadStats) -> Option<Self> {
Some(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item))
}
}
impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem {
@@ -457,9 +390,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
fn ingest(&mut self, inp: &mut Self::InputValue) {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) => panic!(),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(),
MinMaxAvgScalarBinBatchStreamItem::Log(_) => panic!(),
}
}

View File

@@ -38,20 +38,16 @@ pub trait ToJsonResult {
impl<T> AggregatableXdim1Bin for StreamItem<T>
where
// TODO bound on the Output ???
//T: AggregatableTdim + AggregatableXdim1Bin<Output = T>,
T: AggregatableTdim + AggregatableXdim1Bin,
{
type Output = StreamItem<<T as AggregatableXdim1Bin>::Output>;
fn into_agg(self) -> Self::Output {
// TODO how to handle the type mismatch?
/*match self {
Self::Log(item) => Self::Log(item),
Self::Stats(item) => Self::Stats(item),
Self::DataItem(item) => Self::DataItem(item.into_agg()),
}*/
err::todoval()
match self {
Self::Log(item) => Self::Output::Log(item),
Self::Stats(item) => Self::Output::Stats(item),
Self::DataItem(item) => Self::Output::DataItem(item.into_agg()),
}
}
}
@@ -60,7 +56,6 @@ where
T: AggregatableTdim,
{
inner_agg: <T as AggregatableTdim>::Aggregator,
_mark: std::marker::PhantomData<T>,
}
impl<T> StreamItemAggregator<T>
@@ -70,7 +65,6 @@ where
pub fn new(ts1: u64, ts2: u64) -> Self {
Self {
inner_agg: <T as AggregatableTdim>::aggregator_new_static(ts1, ts2),
_mark: std::marker::PhantomData::default(),
}
}
}
@@ -83,23 +77,45 @@ where
type OutputValue = StreamItem<<<T as AggregatableTdim>::Aggregator as AggregatorTdim>::OutputValue>;
fn ends_before(&self, inp: &Self::InputValue) -> bool {
todo!()
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.ends_before(item),
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
todo!()
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.ends_after(item),
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
todo!()
match inp {
StreamItem::Log(_) => false,
StreamItem::Stats(_) => false,
StreamItem::DataItem(item) => self.inner_agg.starts_after(item),
}
}
fn ingest(&mut self, inp: &mut Self::InputValue) {
todo!()
match inp {
StreamItem::Log(_) => {}
StreamItem::Stats(_) => {}
StreamItem::DataItem(item) => {
self.inner_agg.ingest(item);
}
}
}
fn result(self) -> Vec<Self::OutputValue> {
todo!()
self.inner_agg
.result()
.into_iter()
.map(|k| StreamItem::DataItem(k))
.collect()
}
}
@@ -129,51 +145,4 @@ where
None => None,
}
}
// TODO refactor: the point of having the StreamItem is that this function is no longer necessary:
fn is_log_item(&self) -> bool {
if let Self::Log(_) = self {
true
} else {
false
}
}
// TODO should be able to remove this from trait:
fn log_item(self) -> Option<LogItem> {
if let Self::Log(item) = self {
Some(item)
} else {
None
}
}
// TODO should be able to remove this from trait:
fn make_log_item(item: LogItem) -> Option<Self> {
Some(Self::Log(item))
}
// TODO should be able to remove this from trait:
fn is_stats_item(&self) -> bool {
if let Self::Stats(_) = self {
true
} else {
false
}
}
// TODO should be able to remove this from trait:
fn stats_item(self) -> Option<EventDataReadStats> {
if let Self::Stats(_item) = self {
// TODO this whole function should no longer be needed.
Some(err::todoval())
} else {
None
}
}
// TODO should be able to remove this from trait:
fn make_stats_item(item: EventDataReadStats) -> Option<Self> {
Some(Self::Stats(StatsItem::EventDataReadStats(item)))
}
}