From c6b9ba71540dfb200a07cd68f184cace5d49779d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 29 Sep 2021 23:21:38 +0200 Subject: [PATCH] Fix test --- disk/src/eventblobs.rs | 25 +++++---- disk/src/lib.rs | 1 + disk/src/merge.rs | 84 ++-------------------------- disk/src/rangefilter.rs | 119 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 88 deletions(-) create mode 100644 disk/src/rangefilter.rs diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 5b860d2..b87bf6a 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -81,6 +81,7 @@ impl EventChunkerMultifile { self.seen_after_range_count } + // TODO remove or use Drop impl? pub fn close(&mut self) { if let Some(evs) = &mut self.evs { self.seen_before_range_count += evs.seen_before_range_count(); @@ -220,6 +221,8 @@ impl Stream for EventChunkerMultifile { #[cfg(test)] mod test { + use crate::merge::MergedStream; + use crate::rangefilter::RangeFilter; use crate::{eventblobs::EventChunkerMultifile, eventchunker::EventChunkerConf}; use err::Error; use futures_util::StreamExt; @@ -252,8 +255,8 @@ mod test { }; let task = async move { let mut event_count = 0; - let mut events = EventChunkerMultifile::new( - range, + let events = EventChunkerMultifile::new( + range.clone(), channel_config, node, nodeix, @@ -262,6 +265,8 @@ mod test { true, true, ); + //let mut events = MergedStream::new(vec![events], range.clone(), true); + let mut events = RangeFilter::new(events, range.clone(), true); let mut tss = vec![]; while let Some(item) = events.next().await { match item { @@ -281,7 +286,6 @@ mod test { Err(e) => return Err(e.into()), } } - events.close(); Ok((event_count, tss)) }; Ok(taskrun::run(task).unwrap()) @@ -298,6 +302,7 @@ mod test { if res.0 != 3 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } + assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500]); Ok(()) } @@ -308,9 +313,10 @@ mod test { end: DAY + MS * 1501, }; let res = read_expanded_for_range(range, 0)?; - if res.0 != 3 { + if res.0 != 4 { Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; } + assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000]); Ok(()) } @@ -321,9 +327,7 @@ mod test { end: DAY + MS * 1501, }; let res = read_expanded_for_range(range, 0)?; - if res.0 != 3 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; - } + assert_eq!(res.1, vec![DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000]); Ok(()) } @@ -335,9 +339,10 @@ mod test { end: DAY + MS * 1501, }; let res = read_expanded_for_range(range, 0)?; - if res.0 != 4 { - Err(Error::with_msg(format!("unexpected number of events: {}", res.0)))?; - } + assert_eq!( + res.1, + vec![DAY - MS * 3000, DAY - MS * 1500, DAY, DAY + MS * 1500, DAY + MS * 3000] + ); Ok(()) } } diff --git a/disk/src/lib.rs b/disk/src/lib.rs index c427ed6..7da788f 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -34,6 +34,7 @@ pub mod index; pub mod merge; pub mod mergeblobs; pub mod paths; +pub mod rangefilter; pub mod raw; pub mod streamlog; diff --git a/disk/src/merge.rs b/disk/src/merge.rs index b5f797c..3511ecf 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -30,8 +30,6 @@ pub struct MergedStream { errored: bool, completed: bool, batch: ITY, - range: NanoRange, - expand: bool, ts_last_emit: u64, range_complete_observed: Vec, range_complete_observed_all: bool, @@ -39,8 +37,6 @@ pub struct MergedStream { data_emit_complete: bool, batch_size: ByteSize, batch_len_emit_histo: HistoLog2, - emitted_after_range: usize, - pre_range_buf: ITY, logitems: VecDeque, event_data_read_stats_items: VecDeque, } @@ -70,8 +66,6 @@ where errored: false, completed: false, batch: ::empty(), - range, - expand, ts_last_emit: 0, range_complete_observed: vec![false; n], range_complete_observed_all: false, @@ -79,8 +73,6 @@ where data_emit_complete: false, batch_size: ByteSize::kb(128), batch_len_emit_histo: HistoLog2::new(0), - emitted_after_range: 0, - pre_range_buf: ITY::empty(), logitems: VecDeque::new(), event_data_read_stats_items: VecDeque::new(), } @@ -154,7 +146,7 @@ where impl Stream for MergedStream where S: Stream> + Unpin, - ITY: PushableIndex + Appendable + Clearable + ByteEstimate + WithTimestamps + Unpin, + ITY: PushableIndex + Appendable + ByteEstimate + WithTimestamps + Unpin, { type Item = Sitemty; @@ -205,21 +197,8 @@ where } } if lowest_ix == usize::MAX { - if self.pre_range_buf.len() == 1 { - let mut ldst = std::mem::replace(&mut self.batch, ITY::empty()); - let ts4 = self.pre_range_buf.ts(0); - info!("\n\nMERGER enqueue after exhausted from stash {}", ts4); - ldst.push_index(&self.pre_range_buf, 0); - self.pre_range_buf.clear(); - self.ts_last_emit = ts4; - self.batch = ldst; - } else if self.pre_range_buf.len() > 1 { - panic!(); - } else { - }; if self.batch.len() != 0 { - let emp = ITY::empty(); - let ret = std::mem::replace(&mut self.batch, emp); + let ret = std::mem::replace(&mut self.batch, ITY::empty()); self.batch_len_emit_histo.ingest(ret.len() as u32); self.data_emit_complete = true; if LOG_EMIT_ITEM { @@ -236,58 +215,8 @@ where } } else { assert!(lowest_ts >= self.ts_last_emit); - let do_emit_event; - let emit_packet_now_2; - if lowest_ts < self.range.beg { - do_emit_event = false; - emit_packet_now_2 = false; - if self.expand { - info!("\n\nMERGER stash {}", lowest_ts); - let mut ldst = std::mem::replace(&mut self.pre_range_buf, ITY::empty()); - ldst.clear(); - let rix = self.ixs[lowest_ix]; - match &self.current[lowest_ix] { - MergedCurVal::Val(val) => ldst.push_index(val, rix), - MergedCurVal::None => panic!(), - MergedCurVal::Finish => panic!(), - } - self.pre_range_buf = ldst; - } else { - }; - } else if lowest_ts >= self.range.end { - if self.expand { - if self.emitted_after_range == 0 { - do_emit_event = true; - emit_packet_now_2 = true; - self.emitted_after_range += 1; - self.range_complete_observed_all = true; - self.data_emit_complete = true; - } else { - do_emit_event = false; - emit_packet_now_2 = false; - }; - } else { - do_emit_event = false; - emit_packet_now_2 = false; - self.data_emit_complete = true; - }; - } else { - do_emit_event = true; - emit_packet_now_2 = false; - }; - if do_emit_event { + { let mut ldst = std::mem::replace(&mut self.batch, ITY::empty()); - if self.pre_range_buf.len() == 1 { - let ts4 = self.pre_range_buf.ts(0); - info!("\n\nMERGER enqueue from stash {}", ts4); - ldst.push_index(&self.pre_range_buf, 0); - self.pre_range_buf.clear(); - } else if self.pre_range_buf.len() > 1 { - panic!(); - } else { - info!("\n\nMERGER nothing in stash"); - }; - info!("\n\nMERGER enqueue {}", lowest_ts); self.ts_last_emit = lowest_ts; let rix = self.ixs[lowest_ix]; match &self.current[lowest_ix] { @@ -309,11 +238,10 @@ where self.ixs[lowest_ix] = 0; self.current[lowest_ix] = MergedCurVal::None; } - let emit_packet_now; - if emit_packet_now_2 || self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { - emit_packet_now = true; + let emit_packet_now = if self.batch.byte_estimate() >= self.batch_size.bytes() as u64 { + true } else { - emit_packet_now = false; + false }; if emit_packet_now { trace!("emit item because over threshold len {}", self.batch.len()); diff --git a/disk/src/rangefilter.rs b/disk/src/rangefilter.rs new file mode 100644 index 0000000..bbfef7a --- /dev/null +++ b/disk/src/rangefilter.rs @@ -0,0 +1,119 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; +use futures_util::StreamExt; +use items::{Appendable, Clearable, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, WithTimestamps}; +use netpod::NanoRange; + +pub struct RangeFilter { + inp: S, + range: NanoRange, + expand: bool, + prerange: ITY, + have_pre: bool, + emitted_pre: bool, + emitted_post: bool, + done: bool, + complete: bool, +} + +impl RangeFilter +where + ITY: Appendable, +{ + pub fn new(inp: S, range: NanoRange, expand: bool) -> Self { + Self { + inp, + range, + expand, + prerange: ITY::empty(), + have_pre: false, + emitted_pre: false, + emitted_post: false, + done: false, + complete: false, + } + } +} + +impl Stream for RangeFilter +where + S: Stream> + Unpin, + ITY: WithTimestamps + PushableIndex + Appendable + Clearable + Unpin, +{ + type Item = Sitemty; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + if self.complete { + panic!("poll_next on complete"); + } else if self.done { + self.complete = true; + Ready(None) + } else { + match self.inp.poll_next_unpin(cx) { + Ready(Some(item)) => match item { + Ok(StreamItem::DataItem(RangeCompletableItem::Data(item))) => { + let mut ret = ITY::empty(); + for i1 in 0..item.len() { + let ts = item.ts(i1); + if ts < self.range.beg { + if self.expand { + self.prerange.clear(); + self.prerange.push_index(&item, i1); + self.have_pre = true; + } else { + }; + } else if ts >= self.range.end { + if self.expand { + if self.have_pre { + ret.push_index(&self.prerange, 0); + self.prerange.clear(); + self.have_pre = false; + }; + if !self.emitted_post { + self.emitted_post = true; + ret.push_index(&item, i1); + self.done = true; + } else { + panic!(); + }; + } else { + self.done = true; + }; + } else { + if self.expand { + if self.have_pre { + ret.push_index(&self.prerange, 0); + self.prerange.clear(); + self.have_pre = false; + } + ret.push_index(&item, i1); + } else { + ret.push_index(&item, i1); + }; + }; + } + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } + k => Ready(Some(k)), + }, + Ready(None) => { + if self.have_pre { + let mut ret = ITY::empty(); + ret.push_index(&self.prerange, 0); + self.have_pre = false; + self.done = true; + Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))))) + } else { + self.done = true; + self.complete = true; + Ready(None) + } + } + Pending => Pending, + } + } + } +}