RangeComplete is not propagated to the test

This commit is contained in:
Dominik Werder
2021-05-05 14:59:34 +02:00
parent 226c2ac6f3
commit 1f3197794f
4 changed files with 25 additions and 12 deletions

View File

@@ -110,16 +110,23 @@ where
k
} else if self.inp_completed {
if self.range_complete {
self.range_complete_emitted = true;
// TODO why can't I declare that type?
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
if let Some(k) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
return Ready(Some(Ok(k)));
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one");
if self.range_complete_emitted {
self.completed = true;
Ready(None)
} else {
self.range_complete_emitted = true;
// TODO why can't I declare that type?
//type TT = <I::Aggregator as AggregatorTdim>::OutputValue;
if let Some(k) = <I::Aggregator as AggregatorTdim>::OutputValue::make_range_complete_item() {
return Ready(Some(Ok(k)));
} else {
warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one");
self.completed = true;
Ready(None)
}
}
} else {
self.completed = true;
Ready(None)
}
} else {

View File

@@ -161,6 +161,7 @@ impl Stream for PreBinnedValueStream {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
info!("PreBinnedValueStream poll_next ENTER");
if self.completed {
panic!("PreBinnedValueStream poll_next on completed");
}
@@ -168,7 +169,7 @@ impl Stream for PreBinnedValueStream {
self.completed = true;
return Ready(None);
}
'outer: loop {
let u = 'outer: loop {
break if let Some(fut) = self.fut2.as_mut() {
match fut.poll_next_unpin(cx) {
Ready(Some(k)) => match k {
@@ -217,6 +218,8 @@ impl Stream for PreBinnedValueStream {
self.open_check_local_file = Some(Box::pin(fut));
continue 'outer;
};
}
};
info!("PBV EXIT WITH {:?}", u);
u
}
}

View File

@@ -44,7 +44,7 @@ impl PreBinnedValueFetchedStream {
}
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub enum PreBinnedItem {
Batch(MinMaxAvgScalarBinBatch),
RangeComplete,

View File

@@ -258,6 +258,7 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
info!("Merger poll ENTER");
if self.completed {
panic!("MergedMinMaxAvgScalarStream poll_next on completed");
}
@@ -265,7 +266,7 @@ where
self.completed = true;
return Ready(None);
}
'outer: loop {
let u = 'outer: loop {
break if self.data_emit_complete {
error!("MERGER NOTE data_emit_complete");
if self.range_complete_observed_all {
@@ -359,7 +360,9 @@ where
Pending => Pending,
}
};
}
};
info!("Merger poll DONE");
u
}
}