From 1f3197794f8f2714f88806a1ef48492679b21756 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 5 May 2021 14:59:34 +0200 Subject: [PATCH] RangeComplete is not propagated to the test --- disk/src/agg/binnedt.rs | 21 ++++++++++++++------- disk/src/cache/pbv.rs | 7 +++++-- disk/src/cache/pbvfs.rs | 2 +- disk/src/merge.rs | 7 +++++-- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 0d5aff6..5ae2403 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -110,16 +110,23 @@ where k } else if self.inp_completed { if self.range_complete { - self.range_complete_emitted = true; - // TODO why can't I declare that type? - //type TT = ::OutputValue; - if let Some(k) = ::OutputValue::make_range_complete_item() { - return Ready(Some(Ok(k))); - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); + if self.range_complete_emitted { + self.completed = true; Ready(None) + } else { + self.range_complete_emitted = true; + // TODO why can't I declare that type? + //type TT = ::OutputValue; + if let Some(k) = ::OutputValue::make_range_complete_item() { + return Ready(Some(Ok(k))); + } else { + warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); + self.completed = true; + Ready(None) + } } } else { + self.completed = true; Ready(None) } } else { diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index bb81f33..d9609d0 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -161,6 +161,7 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + info!("PreBinnedValueStream poll_next ENTER"); if self.completed { panic!("PreBinnedValueStream poll_next on completed"); } @@ -168,7 +169,7 @@ impl Stream for PreBinnedValueStream { self.completed = true; return Ready(None); } - 'outer: loop { + let u = 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { @@ -217,6 +218,8 @@ impl Stream for PreBinnedValueStream { self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; - } + }; + info!("PBV EXIT WITH {:?}", u); + u } } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index ebf24e7..1cd91bc 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -44,7 +44,7 @@ impl PreBinnedValueFetchedStream { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum PreBinnedItem { Batch(MinMaxAvgScalarBinBatch), RangeComplete, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 1ddbd51..47c98e2 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -258,6 +258,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + info!("Merger poll ENTER"); if self.completed { panic!("MergedMinMaxAvgScalarStream poll_next on completed"); } @@ -265,7 +266,7 @@ where self.completed = true; return Ready(None); } - 'outer: loop { + let u = 'outer: loop { break if self.data_emit_complete { error!("MERGER NOTE data_emit_complete"); if self.range_complete_observed_all { @@ -359,7 +360,9 @@ where Pending => Pending, } }; - } + }; + info!("Merger poll DONE"); + u } }