This commit is contained in:
Dominik Werder
2021-05-04 20:24:13 +02:00
parent be36dcce89
commit 6ebb03a2f2
4 changed files with 70 additions and 38 deletions

View File

@@ -501,7 +501,10 @@ impl AggregatableXdim1Bin for Dim1F32StreamItem {
fn into_agg(self) -> Self::Output {
match self {
Dim1F32StreamItem::Values(vals) => MinMaxAvgScalarEventBatchStreamItem::Values(vals.into_agg()),
_ => panic!(),
Dim1F32StreamItem::EventDataReadStats(stats) => {
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats)
}
Dim1F32StreamItem::RangeComplete => MinMaxAvgScalarEventBatchStreamItem::RangeComplete,
}
}
}
@@ -516,6 +519,7 @@ pub enum MinMaxAvgScalarBinBatchStreamItem {
pub struct MinMaxAvgScalarEventBatchStreamItemAggregator {
agg: MinMaxAvgScalarEventBatchAggregator,
event_data_read_stats: EventDataReadStats,
range_complete: bool,
}
impl MinMaxAvgScalarEventBatchStreamItemAggregator {
@@ -524,6 +528,7 @@ impl MinMaxAvgScalarEventBatchStreamItemAggregator {
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
range_complete: false,
}
}
}
@@ -535,21 +540,21 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => todo!(),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => todo!(),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => todo!(),
_ => false,
}
}
@@ -557,7 +562,7 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
match inp {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(),
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => self.range_complete = true,
}
}
@@ -571,6 +576,9 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator {
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
if self.range_complete {
ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete);
}
ret
}
}
@@ -596,6 +604,7 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarEventBatchStreamItem {
pub struct MinMaxAvgScalarBinBatchStreamItemAggregator {
agg: MinMaxAvgScalarBinBatchAggregator,
event_data_read_stats: EventDataReadStats,
range_complete: bool,
}
impl MinMaxAvgScalarBinBatchStreamItemAggregator {
@@ -604,6 +613,7 @@ impl MinMaxAvgScalarBinBatchStreamItemAggregator {
Self {
agg,
event_data_read_stats: EventDataReadStats::new(),
range_complete: false,
}
}
}
@@ -615,21 +625,21 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
fn ends_before(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_before(vals),
_ => todo!(),
_ => false,
}
}
fn ends_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ends_after(vals),
_ => todo!(),
_ => false,
}
}
fn starts_after(&self, inp: &Self::InputValue) -> bool {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.starts_after(vals),
_ => todo!(),
_ => false,
}
}
@@ -637,7 +647,7 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
match inp {
MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals),
MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => self.event_data_read_stats.trans(stats),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(),
MinMaxAvgScalarBinBatchStreamItem::RangeComplete => self.range_complete = true,
}
}
@@ -651,6 +661,9 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator {
ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(
self.event_data_read_stats,
));
if self.range_complete {
ret.push(MinMaxAvgScalarBinBatchStreamItem::RangeComplete);
}
ret
}
}

View File

@@ -143,8 +143,11 @@ impl PreBinnedValueStream {
let s1 = MergedFromRemotes::new(evq, self.node_config.node_config.cluster.clone());
let s2 = s1.into_binned_t(range).map(|k| match k {
Ok(MinMaxAvgScalarBinBatchStreamItem::Values(k)) => Ok(PreBinnedItem::Batch(k)),
Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete) => Ok(PreBinnedItem::RangeComplete),
Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats)) => {
Ok(PreBinnedItem::EventDataReadStats(stats))
}
Err(e) => Err(e),
_ => todo!(),
});
self.fut2 = Some(Box::pin(s2));
}

View File

@@ -160,6 +160,8 @@ where
ts_last_emit: u64,
range_complete_observed: Vec<bool>,
range_complete_observed_all: bool,
range_complete_observed_all_emitted: bool,
batch_size: usize,
}
impl<S> MergedMinMaxAvgScalarStream<S>
@@ -182,6 +184,8 @@ where
ts_last_emit: 0,
range_complete_observed: vec![false; n],
range_complete_observed_all: false,
range_complete_observed_all_emitted: false,
batch_size: 64,
}
}
}
@@ -202,33 +206,41 @@ where
self.completed = true;
return Ready(None);
}
if self.range_complete_observed_all {
error!("MERGER NOTE range_complete_observed_all");
if self.range_complete_observed_all_emitted {
// TODO something to do? Returning None is maybe too early if there is stats left.
} else {
error!("MERGER NOTE range_complete_observed_all EMIT NOW");
self.range_complete_observed_all_emitted = true;
return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete)));
}
}
// can only run logic if all streams are either finished, errored or have some current value.
for i1 in 0..self.inps.len() {
match self.current[i1] {
MergedMinMaxAvgScalarStreamCurVal::None => {
match self.inps[i1].poll_next_unpin(cx) {
Ready(Some(Ok(k))) => {
match k {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => {
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals);
}
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
info!("\n\n:::::: range_complete d {} COMPLETE", d);
} else {
info!("\n\n:::::: range_complete d {}", d);
}
// TODO what else to do here?
todo!();
}
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => {
todo!();
}
Ready(Some(Ok(k))) => match k {
MinMaxAvgScalarEventBatchStreamItem::Values(vals) => {
self.ixs[i1] = 0;
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Val(vals);
}
}
MinMaxAvgScalarEventBatchStreamItem::RangeComplete => {
self.range_complete_observed[i1] = true;
let d = self.range_complete_observed.iter().filter(|&&k| k).count();
if d == self.range_complete_observed.len() {
self.range_complete_observed_all = true;
info!("\n\n:::::: range_complete d {} COMPLETE", d);
} else {
info!("\n\n:::::: range_complete d {}", d);
}
continue 'outer;
}
MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_stats) => {
todo!();
}
},
Ready(Some(Err(e))) => {
// TODO emit this error, consider this stream as done, anything more to do here?
//self.current[i1] = CurVal::Err(e);
@@ -239,7 +251,6 @@ where
self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::Finish;
}
Pending => {
// TODO is this behaviour correct?
return Pending;
}
}
@@ -290,21 +301,25 @@ where
self.batch.tss.push(lowest_ts);
let rix = self.ixs[lowest_ix];
let z = match &self.current[lowest_ix] {
MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix]),
MergedMinMaxAvgScalarStreamCurVal::Val(k) => (k.mins[rix], k.maxs[rix], k.avgs[rix], k.tss.len()),
_ => panic!(),
};
self.batch.mins.push(z.0);
self.batch.maxs.push(z.1);
self.batch.avgs.push(z.2);
self.ixs[lowest_ix] += 1;
if self.ixs[lowest_ix] >= z.3 {
self.ixs[lowest_ix] = 0;
self.current[lowest_ix] = MergedMinMaxAvgScalarStreamCurVal::None;
}
}
if self.batch.tss.len() >= 64 {
if self.batch.tss.len() >= self.batch_size {
let mut k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty());
if self.range_complete_observed_all {
k.range_complete_observed = true;
}
let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k);
break Ready(Some(Ok(ret)));
return Ready(Some(Ok(ret)));
}
}
}