From 073fde5fa8516cc532a9f41e553b81257375c2d6 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 7 May 2021 12:48:47 +0200 Subject: [PATCH] Fixes --- disk/src/agg.rs | 1 - disk/src/agg/binnedt.rs | 271 ++++++++++++++++++--------------- disk/src/agg/eventbatch.rs | 52 +++++-- disk/src/agg/scalarbinbatch.rs | 52 +++++-- disk/src/binnedstream.rs | 19 +-- disk/src/cache.rs | 24 ++- disk/src/cache/pbv.rs | 10 +- disk/src/cache/pbvfs.rs | 2 +- disk/src/eventchunker.rs | 8 +- disk/src/frame/inmem.rs | 3 +- disk/src/merge.rs | 2 +- disk/src/raw/bffr.rs | 14 +- disk/src/raw/conn.rs | 10 +- retrieval/src/test.rs | 4 +- 14 files changed, 265 insertions(+), 207 deletions(-) diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 7aa9af9..9a99a3d 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -417,7 +417,6 @@ where }, EventChunkerItem::RangeComplete => Ready(Some(Ok(Dim1F32StreamItem::RangeComplete))), EventChunkerItem::EventDataReadStats(stats) => { - info!("++++++++ Dim1F32Stream stats {:?}", stats); let ret = Dim1F32StreamItem::EventDataReadStats(stats); Ready(Some(Ok(ret))) } diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index c1f4fed..6f9401f 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -4,7 +4,7 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use netpod::log::*; -use netpod::BinnedRange; +use netpod::{BinnedRange, EventDataReadStats}; use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -28,6 +28,9 @@ pub trait AggregatableTdim: Sized { fn is_log_item(&self) -> bool; fn log_item(self) -> Option; fn make_log_item(item: LogItem) -> Option; + fn is_stats_item(&self) -> bool; + fn stats_item(self) -> Option; + fn make_stats_item(item: EventDataReadStats) -> Option; } pub trait IntoBinnedT { @@ -57,20 +60,20 @@ where aggtor: Option, spec: BinnedRange, curbin: u32, - data_completed: bool, - range_complete: bool, + inp_completed: bool, + all_bins_emitted: bool, + range_complete_observed: bool, range_complete_emitted: bool, left: Option>>>, errored: bool, completed: bool, - inp_completed: bool, tmp_agg_results: VecDeque<::OutputValue>, } impl IntoBinnedTDefaultStream where I: AggregatableTdim, - S: Stream>, + S: Stream> + Unpin, { pub fn new(inp: S, spec: BinnedRange) -> Self { let range = spec.get_range(0); @@ -79,16 +82,119 @@ where aggtor: Some(I::aggregator_new_static(range.beg, range.end)), spec, curbin: 0, - data_completed: false, - range_complete: false, + inp_completed: false, + all_bins_emitted: false, + range_complete_observed: false, range_complete_emitted: false, left: None, errored: false, completed: false, - inp_completed: false, tmp_agg_results: VecDeque::new(), } } + + fn cur(&mut self, cx: &mut Context) -> Poll>> { + if let Some(cur) = self.left.take() { + cur + } else { + let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); + inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + } + } + + fn cycle_current_bin(&mut self) { + self.curbin += 1; + let range = self.spec.get_range(self.curbin); + let ret = self + .aggtor + .replace(I::aggregator_new_static(range.beg, range.end)) + // TODO handle None case, or remove Option if Agg is always present + .unwrap() + .result(); + self.tmp_agg_results = ret.into(); + if self.curbin >= self.spec.count as u32 { + self.all_bins_emitted = true; + } + } + + fn handle( + &mut self, + cur: Poll>>, + ) -> Option::OutputValue, Error>>>> { + use Poll::*; + match cur { + Ready(Some(Ok(k))) => { + if k.is_range_complete() { + self.range_complete_observed = true; + None + } else if k.is_log_item() { + if let Some(item) = k.log_item() { + if let Some(item) = ::OutputValue::make_log_item(item) { + Some(Ready(Some(Ok(item)))) + } else { + error!("IntoBinnedTDefaultStream can not create log item"); + None + } + } else { + error!("supposed to be log item but can't take it"); + None + } + } else if k.is_stats_item() { + if let Some(item) = k.stats_item() { + if let Some(item) = ::OutputValue::make_stats_item(item) { + Some(Ready(Some(Ok(item)))) + } else { + error!("IntoBinnedTDefaultStream can not create stats item"); + None + } + } else { + error!("supposed to be stats item but can't take it"); + None + } + } else if self.all_bins_emitted { + // Just drop the item because we will not emit anymore data. + // Could also at least gather some stats. + None + } else { + let ag = self.aggtor.as_mut().unwrap(); + if ag.ends_before(&k) { + None + } else if ag.starts_after(&k) { + self.left = Some(Ready(Some(Ok(k)))); + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } else { + let mut k = k; + ag.ingest(&mut k); + let k = k; + if ag.ends_after(&k) { + self.left = Some(Ready(Some(Ok(k)))); + self.cycle_current_bin(); + } + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + } + Ready(Some(Err(e))) => { + self.errored = true; + Some(Ready(Some(Err(e)))) + } + Ready(None) => { + // No more input, no more in leftover. + self.inp_completed = true; + if self.all_bins_emitted { + None + } else { + self.cycle_current_bin(); + // TODO cycle_current_bin enqueues the bin, can I return here instead? + None + } + } + Pending => Some(Pending), + } + } } impl Stream for IntoBinnedTDefaultStream @@ -101,132 +207,43 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("IntoBinnedTDefaultStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } + /* + Reconsider structure here: + I want to exhaust the input stream until it gives Ready(None) because there can be more Status or other new events. + The first time that I recognize that the requested data range is complete, I can set a flag. + After that, I can dismiss incoming data events. + */ 'outer: loop { - if let Some(item) = self.tmp_agg_results.pop_front() { - return Ready(Some(Ok(item))); - } else if self.data_completed { - if self.range_complete { - if self.range_complete_emitted { - self.completed = true; - return Ready(None); + break if self.completed { + panic!("IntoBinnedTDefaultStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.tmp_agg_results.pop_front() { + Ready(Some(Ok(item))) + } else if self.range_complete_emitted { + self.completed = true; + Ready(None) + } else if self.inp_completed && self.all_bins_emitted { + self.range_complete_emitted = true; + if self.range_complete_observed { + // TODO why can't I declare that type alias? + //type TT = I; + if let Some(item) = ::OutputValue::make_range_complete_item() { + Ready(Some(Ok(item))) } else { - self.range_complete_emitted = true; - // TODO why can't I declare that type? - //type TT = ::OutputValue; - if let Some(item) = ::OutputValue::make_range_complete_item() { - return Ready(Some(Ok(item))); - } else { - warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); - self.completed = true; - return Ready(None); - } + warn!("IntoBinnedTDefaultStream should emit RangeComplete but it doesn't have one"); + continue 'outer; } } else { - self.completed = true; - return Ready(None); + continue 'outer; } - } - let cur = if let Some(k) = self.left.take() { - k - } else if self.inp_completed { - Ready(None) } else { - let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) - }; - break match cur { - Ready(Some(Ok(k))) => { - if k.is_range_complete() { - self.range_complete = true; - continue 'outer; - } else if k.is_log_item() { - if let Some(item) = k.log_item() { - if let Some(item) = - ::OutputValue::make_log_item(item.clone()) - { - Ready(Some(Ok(item))) - } else { - warn!("IntoBinnedTDefaultStream can not create log item"); - continue 'outer; - } - } else { - panic!() - } - } else { - let ag = self.aggtor.as_mut().unwrap(); - if ag.ends_before(&k) { - //info!("ENDS BEFORE"); - continue 'outer; - } else if ag.starts_after(&k) { - self.left = Some(Ready(Some(Ok(k)))); - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let ret = self - .aggtor - .replace(I::aggregator_new_static(range.beg, range.end)) - .unwrap() - .result(); - self.tmp_agg_results = ret.into(); - if self.curbin as u64 >= self.spec.count { - self.data_completed = true; - } - continue 'outer; - } else { - let mut k = k; - ag.ingest(&mut k); - let k = k; - if ag.ends_after(&k) { - self.left = Some(Ready(Some(Ok(k)))); - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - let ret = self - .aggtor - .replace(I::aggregator_new_static(range.beg, range.end)) - .unwrap() - .result(); - self.tmp_agg_results = ret.into(); - if self.curbin as u64 >= self.spec.count { - self.data_completed = true; - } - continue 'outer; - } else { - continue 'outer; - } - } - } + let cur = self.cur(cx); + match self.handle(cur) { + Some(item) => item, + None => continue 'outer, } - Ready(Some(Err(e))) => { - self.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - self.inp_completed = true; - if self.curbin as u64 >= self.spec.count { - self.data_completed = true; - continue 'outer; - } else { - self.curbin += 1; - let range = self.spec.get_range(self.curbin); - match self.aggtor.replace(I::aggregator_new_static(range.beg, range.end)) { - Some(ag) => { - let ret = ag.result(); - self.tmp_agg_results = ret.into(); - continue 'outer; - } - None => { - panic!(); - } - } - } - } - Pending => Pending, }; } } diff --git a/disk/src/agg/eventbatch.rs b/disk/src/agg/eventbatch.rs index 9ee2e32..3c9c34e 100644 --- a/disk/src/agg/eventbatch.rs +++ b/disk/src/agg/eventbatch.rs @@ -132,6 +132,18 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatch { fn make_log_item(_item: LogItem) -> Option { None } + + fn is_stats_item(&self) -> bool { + false + } + + fn stats_item(self) -> Option { + None + } + + fn make_stats_item(_item: EventDataReadStats) -> Option { + None + } } impl MinMaxAvgScalarEventBatch { @@ -298,20 +310,36 @@ impl AggregatableTdim for MinMaxAvgScalarEventBatchStreamItem { fn make_log_item(item: LogItem) -> Option { Some(MinMaxAvgScalarEventBatchStreamItem::Log(item)) } + + fn is_stats_item(&self) -> bool { + if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) = self { + true + } else { + false + } + } + + fn stats_item(self) -> Option { + if let MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item) = self { + Some(item) + } else { + None + } + } + + fn make_stats_item(item: EventDataReadStats) -> Option { + Some(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(item)) + } } pub struct MinMaxAvgScalarEventBatchStreamItemAggregator { agg: MinMaxAvgScalarEventBatchAggregator, - event_data_read_stats: EventDataReadStats, } impl MinMaxAvgScalarEventBatchStreamItemAggregator { pub fn new(ts1: u64, ts2: u64) -> Self { let agg = ::aggregator_new_static(ts1, ts2); - Self { - agg, - event_data_read_stats: EventDataReadStats::new(), - } + Self { agg } } } @@ -343,25 +371,19 @@ impl AggregatorTdim for MinMaxAvgScalarEventBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarEventBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { - info!("33333333333 2222222222222222222222 see stats {:?}", stats); - self.event_data_read_stats.trans(stats); - } - MinMaxAvgScalarEventBatchStreamItem::RangeComplete => {} - MinMaxAvgScalarEventBatchStreamItem::Log(_) => {} + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(_) => panic!(), + MinMaxAvgScalarEventBatchStreamItem::RangeComplete => panic!(), + MinMaxAvgScalarEventBatchStreamItem::Log(_) => panic!(), } } fn result(self) -> Vec { - let mut ret: Vec<_> = self + let ret: Vec<_> = self .agg .result() .into_iter() .map(MinMaxAvgScalarBinBatchStreamItem::Values) .collect(); - ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( - self.event_data_read_stats, - )); ret } } diff --git a/disk/src/agg/scalarbinbatch.rs b/disk/src/agg/scalarbinbatch.rs index 8aed020..d33bc17 100644 --- a/disk/src/agg/scalarbinbatch.rs +++ b/disk/src/agg/scalarbinbatch.rs @@ -214,6 +214,18 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatch { fn make_log_item(_item: LogItem) -> Option { None } + + fn is_stats_item(&self) -> bool { + false + } + + fn stats_item(self) -> Option { + None + } + + fn make_stats_item(_item: EventDataReadStats) -> Option { + None + } } pub struct MinMaxAvgScalarBinBatchAggregator { @@ -350,6 +362,26 @@ impl AggregatableTdim for MinMaxAvgScalarBinBatchStreamItem { fn make_log_item(item: LogItem) -> Option { Some(MinMaxAvgScalarBinBatchStreamItem::Log(item)) } + + fn is_stats_item(&self) -> bool { + if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) = self { + true + } else { + false + } + } + + fn stats_item(self) -> Option { + if let MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item) = self { + Some(item) + } else { + None + } + } + + fn make_stats_item(item: EventDataReadStats) -> Option { + Some(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(item)) + } } impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { @@ -362,16 +394,12 @@ impl AggregatableXdim1Bin for MinMaxAvgScalarBinBatchStreamItem { pub struct MinMaxAvgScalarBinBatchStreamItemAggregator { agg: MinMaxAvgScalarBinBatchAggregator, - event_data_read_stats: EventDataReadStats, } impl MinMaxAvgScalarBinBatchStreamItemAggregator { pub fn new(ts1: u64, ts2: u64) -> Self { let agg = ::aggregator_new_static(ts1, ts2); - Self { - agg, - event_data_read_stats: EventDataReadStats::new(), - } + Self { agg } } } @@ -403,25 +431,19 @@ impl AggregatorTdim for MinMaxAvgScalarBinBatchStreamItemAggregator { fn ingest(&mut self, inp: &mut Self::InputValue) { match inp { MinMaxAvgScalarBinBatchStreamItem::Values(vals) => self.agg.ingest(vals), - MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats) => { - info!("kkkkkkkkkkkkkkkkk 0000000000000000000 see stats {:?}", stats); - self.event_data_read_stats.trans(stats); - } - MinMaxAvgScalarBinBatchStreamItem::RangeComplete => {} - MinMaxAvgScalarBinBatchStreamItem::Log(_) => {} + MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(_) => panic!(), + MinMaxAvgScalarBinBatchStreamItem::RangeComplete => panic!(), + MinMaxAvgScalarBinBatchStreamItem::Log(_) => panic!(), } } fn result(self) -> Vec { - let mut ret: Vec<_> = self + let ret: Vec<_> = self .agg .result() .into_iter() .map(MinMaxAvgScalarBinBatchStreamItem::Values) .collect(); - ret.push(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats( - self.event_data_read_stats, - )); ret } } diff --git a/disk/src/binnedstream.rs b/disk/src/binnedstream.rs index 2b08c2b..634f3b6 100644 --- a/disk/src/binnedstream.rs +++ b/disk/src/binnedstream.rs @@ -37,14 +37,15 @@ impl BinnedStream { let node_config = node_config.clone(); move |patch| { let query = PreBinnedQuery::new(patch, channel.clone(), agg_kind.clone(), cache_usage.clone()); - PreBinnedValueFetchedStream::new(&query, &node_config) - } - }) - .filter_map(|k| match k { - Ok(k) => ready(Some(k)), - Err(e) => { - error!("{:?}", e); - ready(None) + let s: Pin + Send>> = + match PreBinnedValueFetchedStream::new(&query, &node_config) { + Ok(k) => Box::pin(k), + Err(e) => { + error!("see error {:?}", e); + Box::pin(futures_util::stream::iter(vec![Err(e)])) + } + }; + s } }) .flatten() @@ -65,7 +66,7 @@ impl BinnedStream { } Ok(PreBinnedItem::RangeComplete) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::RangeComplete)), Ok(PreBinnedItem::EventDataReadStats(stats)) => { - info!("BinnedStream observes stats {:?}", stats); + //info!("BinnedStream ''''''''''''''''''' observes stats {:?}", stats); Some(Ok(MinMaxAvgScalarBinBatchStreamItem::EventDataReadStats(stats))) } Ok(PreBinnedItem::Log(item)) => Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))), diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 35a16a9..353a6c6 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -111,12 +111,18 @@ impl PreBinnedQuery { } pub fn make_query_string(&self) -> String { + let cache_usage = match self.cache_usage { + CacheUsage::Use => "use", + CacheUsage::Ignore => "ignore", + CacheUsage::Recreate => "recreate", + }; format!( - "{}&channel_backend={}&channel_name={}&agg_kind={:?}", + "{}&channel_backend={}&channel_name={}&agg_kind={:?}&cache_usage={}", self.patch.to_url_params_strings(), self.channel.backend, self.channel.name, - self.agg_kind + self.agg_kind, + cache_usage, ) } @@ -209,16 +215,6 @@ pub async fn binned_bytes_for_http( // TODO do I need to set up more transformations or binning to deliver the requested data? let s1 = MergedFromRemotes::new(evq, perf_opts, node_config.node_config.cluster.clone()); let s1 = s1.into_binned_t(range); - /*let s1 = s1.map(|k| { - use super::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem::*; - match k { - Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), - Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete), - Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)), - Ok(Log(item)) => Ok(PreBinnedItem::Log(item)), - Err(e) => Err(e), - } - });*/ let s1 = BinnedStreamFromMerged::new(Box::pin(s1))?; let ret = BinnedBytesForHttpStream::new(s1); Ok(Box::pin(ret)) @@ -445,12 +441,12 @@ impl Stream for MergedFromRemotes { Pending } else { if c1 == self.tcp_establish_futs.len() { - debug!("MergedFromRemotes SETTING UP MERGED STREAM"); + debug!("MergedFromRemotes setting up merged stream"); let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); let s1 = MergedMinMaxAvgScalarStream::new(inps); self.merged = Some(Box::pin(s1)); } else { - info!( + debug!( "MergedFromRemotes raw / estab {} {}", c1, self.tcp_establish_futs.len() diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index 2714fe8..dd07f46 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -113,10 +113,7 @@ impl PreBinnedValueStream { match k { Ok(Values(k)) => Ok(PreBinnedItem::Batch(k)), Ok(RangeComplete) => Ok(PreBinnedItem::RangeComplete), - Ok(EventDataReadStats(stats)) => { - info!("PreBinnedValueStream ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ stats {:?}", stats); - Ok(PreBinnedItem::EventDataReadStats(stats)) - } + Ok(EventDataReadStats(stats)) => Ok(PreBinnedItem::EventDataReadStats(stats)), Ok(Log(item)) => Ok(PreBinnedItem::Log(item)), Err(e) => Err(e), } @@ -174,7 +171,6 @@ impl PreBinnedValueStream { } fn try_setup_fetch_prebinned_higher_res(&mut self) { - trace!("try_setup_fetch_prebinned_higher_res for {:?}", self.query.patch); let range = self.query.patch.patch_range(); match PreBinnedPatchRange::covering_range(range, self.query.patch.bin_count() + 1) { Some(range) => { @@ -295,7 +291,9 @@ impl Stream for PreBinnedValueStream { Ready(Some(Ok(PreBinnedItem::Batch(batch)))) } Ok(PreBinnedItem::EventDataReadStats(stats)) => { - info!("PreBinnedValueStream as Stream seeing stats {:?}", stats); + if false { + info!("PreBinnedValueStream ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ ✙ stats {:?}", stats); + } Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats)))) } Ok(PreBinnedItem::Log(item)) => Ready(Some(Ok(PreBinnedItem::Log(item)))), diff --git a/disk/src/cache/pbvfs.rs b/disk/src/cache/pbvfs.rs index 2f4261a..08be953 100644 --- a/disk/src/cache/pbvfs.rs +++ b/disk/src/cache/pbvfs.rs @@ -73,7 +73,7 @@ impl Stream for PreBinnedValueFetchedStream { Ready(Some(Ok(frame))) => match decode_frame::>(&frame) { Ok(Ok(item)) => { match &item { - PreBinnedItem::EventDataReadStats(stats) => { + PreBinnedItem::EventDataReadStats(stats) if false => { info!("PreBinnedValueFetchedStream ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ ✕ stats {:?}", stats); } _ => {} diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index 7f936e0..b76eee3 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -59,7 +59,7 @@ impl EventChunker { data_emit_complete: false, final_stats_sent: false, data_since_last_stats: 0, - stats_emit_interval: 1, + stats_emit_interval: 64, parsed_bytes: 0, } } @@ -84,7 +84,7 @@ impl EventChunker { data_emit_complete: false, final_stats_sent: false, data_since_last_stats: 0, - stats_emit_interval: 1, + stats_emit_interval: 64, parsed_bytes: 0, } } @@ -324,7 +324,6 @@ impl Stream for EventChunker { parsed_bytes: self.parsed_bytes, }; self.parsed_bytes = 0; - warn!("EMIT FINAL STATS {:?}", item); let ret = EventChunkerItem::EventDataReadStats(item); self.final_stats_sent = true; Ready(Some(Ok(ret))) @@ -349,8 +348,7 @@ impl Stream for EventChunker { self.inp.set_need_min(x); self.data_since_last_stats += 1; let ret = EventChunkerItem::Events(res.events); - let ret = Ok(ret); - Ready(Some(ret)) + Ready(Some(Ok(ret))) } } Err(e) => { diff --git a/disk/src/frame/inmem.rs b/disk/src/frame/inmem.rs index 7eed396..61f654f 100644 --- a/disk/src/frame/inmem.rs +++ b/disk/src/frame/inmem.rs @@ -283,7 +283,8 @@ where if n1 == 0 { let n2 = self.buf.len(); if n2 != 0 { - warn!( + // TODO anything more to handle here? + debug!( "InMemoryFrameAsyncReadStream n2 != 0 n2 {} consumed {}", n2, self.inp_bytes_consumed ); diff --git a/disk/src/merge.rs b/disk/src/merge.rs index ee9003a..84079f7 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -216,7 +216,7 @@ where let d = self.range_complete_observed.iter().filter(|&&k| k).count(); if d == self.range_complete_observed.len() { self.range_complete_observed_all = true; - info!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); + debug!("MergedMinMaxAvgScalarStream range_complete d {} COMPLETE", d); } else { trace!("MergedMinMaxAvgScalarStream range_complete d {}", d); } diff --git a/disk/src/raw/bffr.rs b/disk/src/raw/bffr.rs index cfc4dce..38e9def 100644 --- a/disk/src/raw/bffr.rs +++ b/disk/src/raw/bffr.rs @@ -53,12 +53,14 @@ where match decode_frame::(&frame) { Ok(item) => match item { Ok(item) => { - match &item { - MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { - info!("✒✒ ✒✒ ✒✒ ✒✒ ✒✒ ✒✒ stats {:?}", stats); - } - _ => { - info!("✒ ✒ ✒ ✒ other kind") + if false { + match &item { + MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats) => { + info!("✒✒ ✒✒ ✒✒ ✒✒ ✒✒ ✒✒ stats {:?}", stats); + } + _ => { + info!("✒ ✒ ✒ ✒ other kind") + } } } Ready(Some(Ok(item))) diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index c4fbc57..3640731 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -172,11 +172,13 @@ async fn raw_conn_handler_inner_try( .into_dim_1_f32_stream() .into_binned_x_bins_1() .map(|k| { - match &k { - Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats)) => { - info!("raw::conn ✑ ✑ ✑ ✑ ✑ ✑ seeing stats: {:?}", stats); + if false { + match &k { + Ok(MinMaxAvgScalarEventBatchStreamItem::EventDataReadStats(stats)) => { + info!("raw::conn ✑ ✑ ✑ ✑ ✑ ✑ seeing stats: {:?}", stats); + } + _ => {} } - _ => {} } k }); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 0efeaa4..5c15124 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -50,8 +50,8 @@ async fn get_binned_0_inner() -> Result<(), Error> { get_binned_channel( "wave-f64-be-n21", "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:20.000Z", - 1, + "1970-01-01T00:20:30.000Z", + 2, &cluster, ) .await?;