diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 0d5aff6..5ae2403 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -110,16 +110,23 @@ where k } else if self.inp_completed { if self.range_complete { - self.range_complete_emitted = true; - // TODO why can't I declare that type? - //type TT = ::OutputValue; - if let Some(k) = ::OutputValue::make_range_complete_item() { - return Ready(Some(Ok(k))); - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); + if self.range_complete_emitted { + self.completed = true; Ready(None) + } else { + self.range_complete_emitted = true; + // TODO why can't I declare that type? + //type TT = ::OutputValue; + if let Some(k) = ::OutputValue::make_range_complete_item() { + return Ready(Some(Ok(k))); + } else { + warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); + self.completed = true; + Ready(None) + } } } else { + self.completed = true; Ready(None) } } else { diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index bb81f33..d9609d0 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -161,6 +161,7 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + info!("PreBinnedValueStream poll_next ENTER"); if self.completed { panic!("PreBinnedValueStream poll_next on completed"); } @@ -168,7 +169,7 @@ impl Stream for PreBinnedValueStream { self.completed = true; return Ready(None); } - 'outer: loop { + let u = 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { @@ -217,6 +218,8 @@ impl Stream for PreBinnedValueStream { self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; - } + }; + info!("PBV EXIT WITH {:?}", u); + u } } diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index ebf24e7..1cd91bc 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -44,7 +44,7 @@ impl PreBinnedValueFetchedStream { } } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub enum PreBinnedItem { Batch(MinMaxAvgScalarBinBatch), RangeComplete, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 1ddbd51..47c98e2 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -258,6 +258,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + info!("Merger poll ENTER"); if self.completed { panic!("MergedMinMaxAvgScalarStream poll_next on completed"); } @@ -265,7 +266,7 @@ where self.completed = true; return Ready(None); } - 'outer: loop { + let u = 'outer: loop { break if self.data_emit_complete { error!("MERGER NOTE data_emit_complete"); if self.range_complete_observed_all { @@ -359,7 +360,9 @@ where Pending => Pending, } }; - } + }; + info!("Merger poll DONE"); + u } }