From 1ae5c3dc800bb86dc3b0ccf49ffd21fcbf5d1709 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 5 May 2021 16:03:46 +0200 Subject: [PATCH] Seems to work on remote --- disk/src/agg/binnedt.rs | 47 ++++++++++++++++++++-------------------- disk/src/binnedstream.rs | 6 +++-- disk/src/cache/pbv.rs | 7 ++---- disk/src/eventchunker.rs | 2 -- disk/src/merge.rs | 46 +++++++++++++-------------------------- 5 files changed, 45 insertions(+), 63 deletions(-) diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 5ae2403..7dcde8f 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -53,6 +53,7 @@ where aggtor: Option, spec: BinnedRange, curbin: u32, + data_completed: bool, range_complete: bool, range_complete_emitted: bool, left: Option>>>, @@ -74,6 +75,7 @@ where aggtor: Some(I::aggregator_new_static(range.beg, range.end)), spec, curbin: 0, + data_completed: false, range_complete: false, range_complete_emitted: false, left: None, @@ -105,30 +107,30 @@ where 'outer: loop { if let Some(item) = self.tmp_agg_results.pop_front() { return Ready(Some(Ok(item))); + } else if self.data_completed { + if self.range_complete { + if self.range_complete_emitted { + self.completed = true; + return Ready(None); + } else { + self.range_complete_emitted = true; + warn!("IntoBinnedTDefaultStream /////////////////////////////////// emit now RangeComplete"); + // TODO why can't I declare that type? + //type TT = ::OutputValue; + if let Some(item) = ::OutputValue::make_range_complete_item() { + return Ready(Some(Ok(item))); + } else { + warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); + self.completed = true; + return Ready(None); + } + } + } } let cur = if let Some(k) = self.left.take() { k } else if self.inp_completed { - if self.range_complete { - if self.range_complete_emitted { - self.completed = true; - Ready(None) - } else { - self.range_complete_emitted = true; - // TODO why can't I declare that type? - //type TT = ::OutputValue; - if let Some(k) = ::OutputValue::make_range_complete_item() { - return Ready(Some(Ok(k))); - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but I doesn't have one"); - self.completed = true; - Ready(None) - } - } - } else { - self.completed = true; - Ready(None) - } + Ready(None) } else { let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) @@ -190,15 +192,14 @@ where self.inp_completed = true; if self.curbin as u64 >= self.spec.count { warn!("IntoBinnedTDefaultStream curbin out of spec, END"); - self.completed = true; - Ready(None) + self.data_completed = true; + continue 'outer; } else { self.curbin += 1; let range = self.spec.get_range(self.curbin); match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) { Some(ag) => { let ret = ag.result(); - //Ready(Some(Ok(ag.result()))) self.tmp_agg_results = ret.into(); continue 'outer; } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index bf78ca5..0a81ed4 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -61,7 +61,10 @@ impl BinnedStream { _ => None, } } - Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), + Ok(PreBinnedItem::RangeComplete) => { + info!("=================== BINNED STREAM OBSERVES RangeComplete ===================="); + Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)) + } Ok(PreBinnedItem::EventDataReadStats(stats)) => { Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } @@ -73,7 +76,6 @@ impl BinnedStream { ready(g) } }) - //.map(|k| k) .into_binned_t(range); Self { inp: Box::pin(inp) } } diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index d9609d0..bb81f33 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -161,7 +161,6 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - info!("PreBinnedValueStream poll_next ENTER"); if self.completed { panic!("PreBinnedValueStream poll_next on completed"); } @@ -169,7 +168,7 @@ impl Stream for PreBinnedValueStream { self.completed = true; return Ready(None); } - let u = 'outer: loop { + 'outer: loop { break if let Some(fut) = self.fut2.as_mut() { match fut.poll_next_unpin(cx) { Ready(Some(k)) => match k { @@ -218,8 +217,6 @@ impl Stream for PreBinnedValueStream { self.open_check_local_file = Some(Box::pin(fut)); continue 'outer; }; - }; - info!("PBV EXIT WITH {:?}", u); - u + } } } diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index d028b4e..6c0fa74 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -127,8 +127,6 @@ impl EventChunker { let pulse = sl.read_i64::().unwrap() as u64; if ts >= self.range.end { self.seen_beyond_range = true; - // TODO ret.end_of_range_observed = true; - info!("END OF RANGE OBSERVED"); break; } if ts < self.range.beg { diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 47c98e2..02e3056 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -213,9 +213,9 @@ where let d = self.range_complete_observed.iter().filter(|&&k| k).count(); if d == self.range_complete_observed.len() { self.range_complete_observed_all = true; - info!("\n\n:::::: range_complete d {} COMPLETE", d); + info!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); } else { - info!("\n\n:::::: range_complete d {}", d); + trace!("MergedMinMaxAvgScalarStream range_complete d {}", d); } continue 'l1; } @@ -258,7 +258,6 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - info!("Merger poll ENTER"); if self.completed { panic!("MergedMinMaxAvgScalarStream poll_next on completed"); } @@ -266,20 +265,14 @@ where self.completed = true; return Ready(None); } - let u = 'outer: loop { + 'outer: loop { break if self.data_emit_complete { - error!("MERGER NOTE data_emit_complete"); if self.range_complete_observed_all { - error!("MERGER NOTE range_complete_observed_all"); if self.range_complete_observed_all_emitted { - error!("MERGER NOTE range_complete_observed_all_emitted"); - // NOTE everything else (data and stats) must be emitted before data_emit_complete gets set. self.completed = true; Ready(None) } else { - error!("MERGER NOTE range_complete_observed_all EMIT NOW"); self.range_complete_observed_all_emitted = true; - // NOTE this is supposed to return Ready(Some(Ok(MinMaxAvgScalarEventBatchStreamItem::RangeComplete))) } } else { @@ -293,39 +286,32 @@ where let mut lowest_ix = usize::MAX; let mut lowest_ts = u64::MAX; for i1 in 0..self.inps.len() { - match &self.current[i1] { - MergedMinMaxAvgScalarStreamCurVal::Finish => {} - MergedMinMaxAvgScalarStreamCurVal::Val(val) => { - let u = self.ixs[i1]; - if u >= val.tss.len() { - self.ixs[i1] = 0; - self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; - continue 'outer; - } else { - let ts = val.tss[u]; - if ts < lowest_ts { - lowest_ix = i1; - lowest_ts = ts; - } + if let MergedMinMaxAvgScalarStreamCurVal::Val(val) = &self.current[i1] { + let u = self.ixs[i1]; + if u >= val.tss.len() { + self.ixs[i1] = 0; + self.current[i1] = MergedMinMaxAvgScalarStreamCurVal::None; + continue 'outer; + } else { + let ts = val.tss[u]; + if ts < lowest_ts { + lowest_ix = i1; + lowest_ts = ts; } } - _ => panic!(), } } if lowest_ix == usize::MAX { if self.batch.tss.len() != 0 { let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(Some( current batch ))"); let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); self.data_emit_complete = true; Ready(Some(Ok(ret))) } else { - info!("MergedMinMaxAvgScalarStream no more lowest emit Ready(None)"); self.data_emit_complete = true; continue 'outer; } } else { - //trace!("decided on next lowest ts {} ix {}", lowest_ts, lowest_ix); assert!(lowest_ts >= self.ts_last_emit); self.ts_last_emit = lowest_ts; self.batch.tss.push(lowest_ts); @@ -360,9 +346,7 @@ where Pending => Pending, } }; - }; - info!("Merger poll DONE"); - u + } } }